Json based Apache Spark framework

We at Arrah Technology are creator of osDQ – https://sourceforge.net/projects/dataquality/ world first open source data quality product.

We are now working on json based apache spark framework where –

  • Data processing steps can be written in JSON
  • You not need spark developer to write the code. Framework will take care of
  • Business Analyst can write JSON
  • JSON has many utilities to from smooth transfer of complex SQL queries to JSON
  • We will later add UI so that JSON building can be smooth and seamless from UI

Here is a sample JSON

{
  "sparkconfig": 
    {
      "appname": "TransformRunner",
      "master":"local[3]",
      "verbose":"true",
      "noexecutor":"3",
      "nocore":"3",
      "executormemory":"3G",
      "confparam":"spark.speculation,false,spark.hadoop.mapreduce.map.speculative,false,spark.hadoop.mapreduce.reduce.speculative,false"
    },
  "datasources": [
    {
      "name": "SampleData",
      "location":"/Users/vsingh007c/Downloads/drive-download-20180718T060717Z-001/cdc/modified.csv",
      "locationType": "static",
      "format": "csv",
      "selectedColumns": []
    },
    {
      "name": "SampleDataJDBC",
      "format": "jdbc",
      "jdbcparam":"url,jdbc:mysql://localhost:3306/mysql,driver,com.mysql.jdbc.Driver,user,root,password,root,dbtable,(select * from help_category) AS T,partitionColumn,parent_category_id,lowerBound,0,upperBound,10000,numPartitions,10",
      "selectedColumns": []
    }
  ],
  "transformations": [
    {
      "name": "SampleData",
      "type": "sampling",
      "source": "SampleData",
      "priority": 1,
      "cache": false,
      "conditions": [
        {
          "condition": "random",
          "value": ["0.20"]
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "transformation",
      "source": "SampleData",
      "priority": 2,
      "cache": false,
      "conditions": [
        {
          "condition":"UDF",
          "funcname":"concat",
          "value": [ "col1","col2"],
          "column": "testcolumnUDF",
          "datatype": "string"
        },
        {
          "condition":"UDF",
          "funcname":"regex_replace",
          "value": ["col6"],
          "column": "regexcolumnUDF",
          "aggrcondition": "N.*,vivek",
          "datatype": "string"
        },
        {
          "condition":"normalise",
          "funcname":"zeroscore",
          "value": ["col10"],
          "column": "normcol10",
          "datatype": "Long"
        },
        {
          "condition":"normalise",
          "funcname":"zscore",
          "value": ["col10"],
          "column": "zcol10",
          "datatype": "Long"
        }
      ]
    },
    {
      "name": "SampleData-Filter",
      "type": "filter",
      "source": "SampleData",
      "priority": 3,
      "cache": false,
      "conditions": [
        {
          "condition": "Expression",
          "value": ["constant","col5 is null"],
          "datatype": "String"
        },
        {
          "condition": "RLIKE",
          "column":"col6",
          "value": ["constant","^N"],
          "datatype": "String"
        },
        {
          "condition": "DROPDUPLICATES",
          "value": ["constant","col6","col10"],
          "datatype": "String"
        },
        {
          "condition": "selectexpr",
          "value": ["constant","col6","col10 as none", "'5'", "col10*col10","*"],
          "datatype": "String"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 4,
      "cache": false,
      "conditions": [
        {
          "condition": "addcolumns",
          "aggrcondition": "testcolumn1:trim(col10):substrcol1:substr(now(),1,4):newcolumn:concat(testcolumn1,col4)"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "join",
      "source": "SampleData,SampleData",
      "priority": 5,
      "cache": false,
      "conditions": [
        {
          "joinType": "Unionall"
        },
        {
          "joinType": "Intersect"
        }
      ]
    },
    {
      "name": "StratifiedData",
      "type": "sampling",
      "source": "SampleData",
      "priority": 6,
      "cache": false,
      "conditions": [
        {
          "condition": "stratified",
          "value": ["0.002","I","0.8"],
          "column": "col2"
        }
      ]
    },
    
    {
      "name": "Groupdata",
      "type": "aggregate",
      "source": "StratifiedData",
      "priority": 7,
      "cache": false,
      "conditions": [
        {
          "condition": "groupby",
          "value": [ "col2"],
          "aggrcondition": "col2,count"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 8,
      "cache": false,
      "conditions": [
        {
          "condition": "renamecolumns",
          "aggrcondition": "testcolumn1,rename1_vivek,substrcol1,rename2,newcolumn,rename3"
        },
        {
          "condition": "reordercolumns",
          "aggrcondition": "rename1_vivek,rename2,rename3"
        },
        {
          "condition": "replacecolumns",
          "aggrcondition": "rename3:5"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "join",
      "source": "SampleData,SampleData",
      "priority": 9,
      "cache": false,
      "conditions": [
        {
          "joinType": "left_outer",
          "leftcolumn": "rename1_vivek,rename2,rename3",
          "condition": "not equal",
          "rightcolumn": "rename1_vivek,rename2",
          "dropcolumn": "rename2"
        }
      ]
    },
    {
      "name": "SampleData",
      "type": "enrichment",
      "source": "SampleData",
      "priority": 10,
      "cache": false,
      "conditions": [
        {
          "condition": "renameallcolumns",
          "aggrcondition": "sample_"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol:null"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol2:case when(newcol == null) then 'not null' else 'null value' END"
        },
        {
          "condition": "addcolumns",
          "aggrcondition": "newcol1:if(newcol2='not null',0,5*5)"
        }
      ]
    }
  ],
  "outputs": [
    {
      "name": "BASE_TABLE",
      "location": "/Users/vsingh007c/Downloads/",
      "sources": ["UDFData-Filter-newCol"],
      "format": "parquet",
      "savemode": "append",
      "selectedColumns": []
    }
  ]
}

Soon we will publish readme and open source the framework code. If you have any requirement/use case where we want to migrate data processing or migrate you old SQL to spark feel free to contact us at support@arrahtech.com

Advertisements

Is ML/AI for big companies only ?

Recently, a friend of mine who owns a SME ( Small and Mid Size Enterprise ) asked me – “Is ML/AI for big companies only ? Can we (SME) also benefit from Machine Learning and Artificial Intelligence wave ” ?

I asked – “why do you think you will not benefit” ?

“Most of use cases talk about ‘terabytes of data, complex algorithms and super human data scientist’ which are beyond SMEs reach. So I guess it is for big companies only” – he replied.

This may be the impression most SME owners might carry, but it is not far from truth. In fact, it is other way. The impact of ML/AI will be more pronounced on SMEs while on big companies it will average out.

Here are the reasons:

1.) Most of ML/AI algorithms needs 50-100 data points to create a model. So it is not terabytes, even KB and MB of data will be good enough for model inputs.

2.) A segment of 50-100 members is good enough to define segment behavior. So a customer base of 500+ is a good use case for segment analysis.

3.) Effect of weather or local events has larger effect on local businesses. It is easier to track impact on SMEs ; for large corporation it averages out.

4.) There are less data silos in SME so unlocking is easier. Data is well understood in SME world while in big corporations it is very complex and often riddled with politics.

5.) SME owners understand their business in totality, while executive of big corporations know only one part of business. Better business understanding and defined metrics lead to better models.

6.) Better personalized service is possible for SMEs where segment size is small and customers are local and loyalty has major impact on business.

7.) Now tools and cloud has taken cost barrier out for SMEs. They should be using AI/ML extensively to drive their business.

Vivek Singh – data architect, open source evangelist , chief contributor of Open Source Data Quality project http://sourceforge.net/projects/dataquality/

Author of fiction book “The Reverse Journey” http://www.amazon.com/Reverse-Journey-Vivek-Kumar-Singh/dp/9381115354/

Sampling using apache Spark 2.1.1

There has been a debate in big data community “do we need sampling in big data world” ? Now big data platform has storage and processing power to use all dataset for analysis so sampling is not required – is the argument.

This arguments holds true for data discovery where doing analysis on full dataset give more confidence and every anomaly and pattern is captured. However, sampling still saves considerable amount of time in dimension reduction, correlation and model generation. You have to go through hundreds of attributes (with all permutation and combination) to find dependent, independent , correlated variables where representative dataset saves you hour with almost same accuracy.

We have open sourced data sampling code using apache spark 2.1.1 at https://sourceforge.net/projects/apache-spark-osdq/

Random Sampling : on dataset<row> random sampling is provided by apache spark where user can provide the fraction he needs for sampling.

Dataset<Row> org.apache.spark.sql.Dataset.sample(boolean withReplacement, double fraction)

Stratified Sampling : dataset<row> does not provide stratified sampling so dataset is converted into PairedRDD with key column which need to be stratified and then use samplebyKeyExact. It does many pass to find the exact fraction

for(Row uniqueKey:uniqueKeys){

fractionsMap.merge(uniqueKey.mkString(),fraction, (V1,V2) -> V1);

}

		

JavaPairRDD<String, Row> dataPairRDD = SparkHelper.dfToPairRDD(keyColumn, df);
JavaRDD<Row> sampledRDD = dataPairRDD.sampleByKeyExact(false, fractionsMap).values();
Dataset<Row> sampledDF = df.sqlContext().createDataFrame(sampledRDD, df.schema());

return sampledDF;

Keylist Sampling : This is like Stratified sampling but only make one pass to meet the fraction value.

JavaRDD<Row> sampledRDD = dataPairRDD.sampleByKey(false, fractionsMap).values();

Command Line Option :

 -c,--keyColumn <arg>          Key Column for stratified/keylist sampling

 -f,--fraction <arg>           Sample fraction size

 -fm,--fractionMapping <arg>   comma seperated pairs of key,fraction size

 -h,--help                     show this help.

 -i,--input <arg>              Input Folder/File path

 -if,--inputFormat <arg>       input file format

 -o,--output <arg>             Output Folder path

 -of,--outFormat <arg>         output file format

 -t,--type <arg>               Sampling type  - ran

                               dom/stratified/keylist

example : ” -f 0.2 -i ./testfile -if csv -o ./outputFile -of csv -t stratified -c key1 -fm ./keymapping”

testfile:

key1,key2

vivek,1

vivek,1

vivek,1

vivek,1

vivek,1

vivek,1

vivek,1

vivek,1

singh,2

singh,2

singh,2

singh,2

singh,2

singh,2

singh,2

singh,2

singh,2

keymapping:

vivek,0.1

singh,0.1

about Vivek Singh – data architect, open source evangelist , chief contributor of Open Source Data Quality project http://sourceforge.net/projects/dataquality/

Author of fiction book “The Reverse Journey” http://www.amazon.com/Reverse-Journey-Vivek-Kumar-Singh/dp/9381115354/

How to do Diff of Spark dataframe

Apache spark does not provide diff or subtract method for Dataframes. However, it is common requirement to do diff of dataframes – especially where data engineers have to find out what changes from previous values ( dataframe).

Requirements has generally following use cases:

a.) Find out diff (subtract) with complete dataframes

b.) Find out diff (subtract) with primary keys (Single column)

c.) Find out diff (subtract) with composite keys (Mupltiple columns)

Since dataframe does not have substract method here is the following step you need to do

i) First convert dataframe to RDD keeping the schema of dataframe safe.

ii) Create a pairedRDD for key value pair for step b and c

iii.) Use the substract method of RDD and apply the schema on RDD

iv.) Get back your dataframe

	// find the diff between two data sets A -B
	public DataFrame findDiff ( DataFrame left, DataFrame right) {
		if (left == null || right == null ) {
			return null;
		}
		StructType schema = left.schema();
		JavaRDD<Row> leftRDD = left.toJavaRDD();
		JavaRDD<Row> rightRDD = right.toJavaRDD();
		
		// diff which is there in right but not in left deleted value
		JavaRDD<Row> diffRDD = rightRDD.subtract(leftRDD);
		DataFrame newdf = sqlContext.createDataFrame(diffRDD, schema);
		
		return newdf;
		
	}
	
	// find the diff between two data sets A -B using colname
	public DataFrame findDiff ( DataFrame left, String leftCol, DataFrame right,  String rightCol) {
		if (left == null || right == null ) {
			return null;
		}
		StructType schema = right.schema();
		JavaRDD<Row> leftRDD = left.toJavaRDD();
		JavaRDD<Row> rightRDD = right.toJavaRDD();
		String[] leftColName = left.columns();
		String[] rightColName = right.columns();
		int leftI=0; int rightI=0;
		for (int i=0 ; i < leftColName.length; i++)
			if (leftCol.equals(leftColName[i])) {
				leftI = i; break;
			}
		for (int i=0 ; i < rightColName.length; i++)
			if (rightCol.equals(rightColName[i])) {
				rightI = i; break;
			}
		final int leftIf = leftI;
		final int rightIf = rightI;
				
						
		
		// Now creare paired RDD for substract
		JavaPairRDD<String, Row> leftPair = leftRDD.mapToPair(new PairFunction<Row, String, Row>() {
		            /**
			 * 
			 */
			private static final long serialVersionUID = 1L;

					public Tuple2<String, Row> call(Row row) throws Exception {
		            	
		                return new Tuple2<String, Row>(row.get(leftIf).toString(), row);
		            }
		 }).cache();
		
		JavaPairRDD<String, Row> rightPair = rightRDD.mapToPair(new PairFunction<Row, String, Row>() {
            /**
	 * 
	 */
			private static final long serialVersionUID = 1L;

			public Tuple2<String, Row> call(Row row) throws Exception {
            	
                return new Tuple2<String, Row>(row.get(rightIf).toString(), row);
            }
		}).cache();
		
		// diff which is there in right but not in left deleted value
		// apply schema of right
		JavaPairRDD<String, Row> diffRDD = rightPair.subtractByKey(leftPair);
		JavaRDD<Row> newdataframe= diffRDD.values();
		DataFrame newdf = sqlContext.createDataFrame(newdataframe, schema);
		
		return newdf;
		
	}

Apache Spark ML for Data Quality

Apache Spark is becoming de-facto standard for data processing. Spark platform is over-arching to all aspects of data lifecycle – Ingestion, Discovery, Preparation and Data Science with easy to use, developers friendly APIs.

Availability of large set of statistical and machine leaning based, scalable algorithm in Spark will bring a new perspective to data quality and validation where these algorithms will be used to automatic and machine based data anomaly detection and correction. Though these algorithms are not new, but bring them into data engineering and data architect domain is new. R, SAS, MATLAB etc were confined to data scientists and not popular with data engineering.

Algorithms like Principal Component Analysis, Support Vector Machine, Pairwise comparison, Regression , Edit Distance, K-Mean have to play critical role in automation of data quality and data correction rules. In the following code, I have used Spark Mlib linear regression model to replace null from column A based on regression values from column B ( linear regression model), and a set of columns (multilinear regression model)

This code snippet is for educational purpose only.

1.) Create a dataframe on which you want to apply the rules – inputBean is that object

2.) Create and Train Model – Linear

public LinearRegressionModel doLinearReg(DataFrameProperty inputBean, int numIterations) {

DataFrame df = inputBean.getDataFrame();
String labelCol = inputBean.getLabelCol(); // replace null from this column
String regCol = inputBean.getRegCol(); // use this column for regression
DataFrame newdf = df.select(labelCol, regCol);

JavaRDD<LabeledPoint> parseddata = newdf.javaRDD().map(new Function<Row, LabeledPoint>() {
private static final long serialVersionUID = 1L;
public LabeledPoint call(Row r) throws Exception {
Object labVObj = r.get(0);
Object regVarObj = r.get(1);
if (labVObj != null && regVarObj != null) {
double[] regv = new double[] { (Double) regVarObj };

Vector regV = new DenseVector(regv);
return new LabeledPoint((Double) labVObj, regV);
} else {

double[] regv = new double[] { 0.0 };
Vector regV = new DenseVector(regv);
return new LabeledPoint(0.0, regV);
} }
});

// Building the model
return LinearRegressionWithSGD.train(parseddata.rdd(), numIterations);
}

3.) Use model to replace Null Value

//LinearRegressionModel model = dqu.doLinearReg(inputBean,20);

//System.out.println(“\n Intercept: ” + model.intercept());

// System.out.println(“Weight :” + model.weights().toString());

public DataFrame replaceNull(DataFrameProperty inputBean, final double intercept, final double weight) {

DataFrame df = inputBean.getDataFrame();
String labelCol = inputBean.getLabelCol();
String regCol = inputBean.getRegCol();
String uniqCol = inputBean.getUniqColName();
SQLContext sqlContext = inputBean.getSqlContext();
DataFrame newdf = df.select(labelCol, regCol, uniqCol);

JavaRDD<Row> parseddata = newdf.toJavaRDD().map(new FunctionMap(intercept,weight));

// Generate the schema based on the string of schema
StructField[] fields = new StructField[3];
fields[0] = DataTypes.createStructField(labelCol, DataTypes.DoubleType, true);
fields[1] = DataTypes.createStructField(regCol, DataTypes.DoubleType, true);
fields[2] = DataTypes.createStructField(uniqCol, DataTypes.DoubleType, true);
StructType schema = DataTypes.createStructType(fields);
DataFrame newdf1 = sqlContext.createDataFrame(parseddata, schema);
df.join(newdf1, uniqCol); // After replace join to main dataframe based on unique column 
returndf;
}

4.) Create and Train Model – Multi Linear

public LinearRegressionModel doMultiLinearReg(DataFrameProperty inputBean, int numIterations) {

DataFrame df = inputBean.getDataFrame();
String labelCol = inputBean.getLabelCol();
String [] inputCols = inputBean.getInputCols(); // multiple columns for regression
DataFrame newdf = df.select(labelCol, inputCols);

JavaRDD<LabeledPoint> parseddata = newdf.javaRDD().map(new Function<Row, LabeledPoint>() {
private static final long serialVersionUID = 1L;
public LabeledPoint call(Row r) throws Exception {
Object labVObj = r.get(0);
int colC = r.size();
double[] regv = new double[colC -1]; // -1 for first index

for (int i =1; i < colC; i++) {
Object regVarObj = r.get(i);
if (regVarObj != null)
regv[i-1] = (Double)regVarObj;
else
regv[i-1] = 0.0D; // Null replaced with 0.0
}
Vector regV = new DenseVector(regv);
if (labVObj != null ) {
return new LabeledPoint((Double) labVObj, regV);
} else {
return new LabeledPoint(0.0D, regV);
}}
});

// Building the model
return LinearRegressionWithSGD.train(parseddata.rdd(), numIterations);
}

5.) Use for null replacement

DataFrame newdf = dqu.replaceNull(inputBean,model.intercept(),model.weights().toArray()[0]);

6.) Function Map class

public class FunctionMap  implements java.io.Serializable, Function<Row,Row> {

private static final long serialVersionUID = 1L;
double _intercept, _weight;
public FunctionMap(double intercept, double weight) {
_intercept=intercept;
_weight = weight;
}

public Row call(Row r) throws Exception {
Double regv = r.getDouble(1);
if (r.get(0) == null && regv != null) {
double newVal = _intercept + _weight * regv;
return RowFactory.create(newVal, regv, r.getDouble(2));
} else
return r;
}
}