venn

How to do Diff of Spark dataframe

Apache spark does not provide diff or subtract method for Dataframes. However, it is common requirement to do diff of dataframes – especially where data engineers have to find out what changes from previous values ( dataframe).

Requirements has generally following use cases:

a.) Find out diff (subtract) with complete dataframes

b.) Find out diff (subtract) with primary keys (Single column)

c.) Find out diff (subtract) with composite keys (Mupltiple columns)

Since dataframe does not have substract method here is the following step you need to do

i) First convert dataframe to RDD keeping the schema of dataframe safe.

ii) Create a pairedRDD for key value pair for step b and c

iii.) Use the substract method of RDD and apply the schema on RDD

iv.) Get back your dataframe

	// find the diff between two data sets A -B
	public DataFrame findDiff ( DataFrame left, DataFrame right) {
		if (left == null || right == null ) {
			return null;
		}
		StructType schema = left.schema();
		JavaRDD<Row> leftRDD = left.toJavaRDD();
		JavaRDD<Row> rightRDD = right.toJavaRDD();
		
		// diff which is there in right but not in left deleted value
		JavaRDD<Row> diffRDD = rightRDD.subtract(leftRDD);
		DataFrame newdf = sqlContext.createDataFrame(diffRDD, schema);
		
		return newdf;
		
	}
	
	// find the diff between two data sets A -B using colname
	public DataFrame findDiff ( DataFrame left, String leftCol, DataFrame right,  String rightCol) {
		if (left == null || right == null ) {
			return null;
		}
		StructType schema = right.schema();
		JavaRDD<Row> leftRDD = left.toJavaRDD();
		JavaRDD<Row> rightRDD = right.toJavaRDD();
		String[] leftColName = left.columns();
		String[] rightColName = right.columns();
		int leftI=0; int rightI=0;
		for (int i=0 ; i < leftColName.length; i++)
			if (leftCol.equals(leftColName[i])) {
				leftI = i; break;
			}
		for (int i=0 ; i < rightColName.length; i++)
			if (rightCol.equals(rightColName[i])) {
				rightI = i; break;
			}
		final int leftIf = leftI;
		final int rightIf = rightI;
				
						
		
		// Now creare paired RDD for substract
		JavaPairRDD<String, Row> leftPair = leftRDD.mapToPair(new PairFunction<Row, String, Row>() {
		            /**
			 * 
			 */
			private static final long serialVersionUID = 1L;

					public Tuple2<String, Row> call(Row row) throws Exception {
		            	
		                return new Tuple2<String, Row>(row.get(leftIf).toString(), row);
		            }
		 }).cache();
		
		JavaPairRDD<String, Row> rightPair = rightRDD.mapToPair(new PairFunction<Row, String, Row>() {
            /**
	 * 
	 */
			private static final long serialVersionUID = 1L;

			public Tuple2<String, Row> call(Row row) throws Exception {
            	
                return new Tuple2<String, Row>(row.get(rightIf).toString(), row);
            }
		}).cache();
		
		// diff which is there in right but not in left deleted value
		// apply schema of right
		JavaPairRDD<String, Row> diffRDD = rightPair.subtractByKey(leftPair);
		JavaRDD<Row> newdataframe= diffRDD.values();
		DataFrame newdf = sqlContext.createDataFrame(newdataframe, schema);
		
		return newdf;
		
	}
osDQ

osDQ releases apache spark based data quality

World’s first open source data quality and data preparation project (osDQ – https://sourceforge.net/projects/dataquality/ ) releases apache spark based data quality and data preparation modules for big data.

Apache Spark based APIs can be downloaded from here : https://sourceforge.net/projects/apache-spark-osdq/

This beta release has following features :

Normalization:

  • ZScore

Functional input – mean and std dev

Return type : dataframe

  • ZeroScore (between 0 and 1)

Functional input – min and max

Return type : dataframe

  • RatioScore (num/denum)

Functional input – ratio number

Return type : dataframe

  • Subtraction Score (a –b)

Functional input – Subtraction number

Return type : dataframe

Replacement:

  • Replacement with key-value pairs

Functional input – hashtable and columns type

Return type : dataframe

  • Replacement Null with default value

Functional input – value

Return type : dataframe

  • Replacement using regression value (linear and multi-linear)

Functional input – No of iterations

Return type : dataframe

Remove:

  • Removing Null Rows

Functional input – all or any

Return type : dataframe

  • Removing Duplicate Rows

Functional input – all or any

Return type : dataframe

Profiling:

Functional input – DataFrame

Return type: Hashtable<Colname,Hashtatable<Key,Value>>

HashKeys – “count”,”unique”,”nullcount”,”pattern”,”min”,”max”

Fuzzy Join and Replacement :

Function Input – two strings

Return type – cosine similarity ( between -1 to 1)

Summary : osDQ will enhance the project to provide more core APIs for data quality , data preparation and data science. It will save community time to write those functions for big data environment.

 

 

profiler2

Apache Spark ML for Data Quality

Apache Spark is becoming de-facto standard for data processing. Spark platform is over-arching to all aspects of data lifecycle – Ingestion, Discovery, Preparation and Data Science with easy to use, developers friendly APIs.

Availability of large set of statistical and machine leaning based, scalable algorithm in Spark will bring a new perspective to data quality and validation where these algorithms will be used to automatic and machine based data anomaly detection and correction. Though these algorithms are not new, but bring them into data engineering and data architect domain is new. R, SAS, MATLAB etc were confined to data scientists and not popular with data engineering.

Algorithms like Principal Component Analysis, Support Vector Machine, Pairwise comparison, Regression , Edit Distance, K-Mean have to play critical role in automation of data quality and data correction rules. In the following code, I have used Spark Mlib linear regression model to replace null from column A based on regression values from column B ( linear regression model), and a set of columns (multilinear regression model)

This code snippet is for educational purpose only.

1.) Create a dataframe on which you want to apply the rules – inputBean is that object

2.) Create and Train Model – Linear

public LinearRegressionModel doLinearReg(DataFrameProperty inputBean, int numIterations) {

DataFrame df = inputBean.getDataFrame();
String labelCol = inputBean.getLabelCol(); // replace null from this column
String regCol = inputBean.getRegCol(); // use this column for regression
DataFrame newdf = df.select(labelCol, regCol);

JavaRDD<LabeledPoint> parseddata = newdf.javaRDD().map(new Function<Row, LabeledPoint>() {
private static final long serialVersionUID = 1L;
public LabeledPoint call(Row r) throws Exception {
Object labVObj = r.get(0);
Object regVarObj = r.get(1);
if (labVObj != null && regVarObj != null) {
double[] regv = new double[] { (Double) regVarObj };

Vector regV = new DenseVector(regv);
return new LabeledPoint((Double) labVObj, regV);
} else {

double[] regv = new double[] { 0.0 };
Vector regV = new DenseVector(regv);
return new LabeledPoint(0.0, regV);
} }
});

// Building the model
return LinearRegressionWithSGD.train(parseddata.rdd(), numIterations);
}

3.) Use model to replace Null Value

//LinearRegressionModel model = dqu.doLinearReg(inputBean,20);

//System.out.println(“\n Intercept: ” + model.intercept());

// System.out.println(“Weight :” + model.weights().toString());

public DataFrame replaceNull(DataFrameProperty inputBean, final double intercept, final double weight) {

DataFrame df = inputBean.getDataFrame();
String labelCol = inputBean.getLabelCol();
String regCol = inputBean.getRegCol();
String uniqCol = inputBean.getUniqColName();
SQLContext sqlContext = inputBean.getSqlContext();
DataFrame newdf = df.select(labelCol, regCol, uniqCol);

JavaRDD<Row> parseddata = newdf.toJavaRDD().map(new FunctionMap(intercept,weight));

// Generate the schema based on the string of schema
StructField[] fields = new StructField[3];
fields[0] = DataTypes.createStructField(labelCol, DataTypes.DoubleType, true);
fields[1] = DataTypes.createStructField(regCol, DataTypes.DoubleType, true);
fields[2] = DataTypes.createStructField(uniqCol, DataTypes.DoubleType, true);
StructType schema = DataTypes.createStructType(fields);
DataFrame newdf1 = sqlContext.createDataFrame(parseddata, schema);
df.join(newdf1, uniqCol); // After replace join to main dataframe based on unique column 
returndf;
}

4.) Create and Train Model – Multi Linear

public LinearRegressionModel doMultiLinearReg(DataFrameProperty inputBean, int numIterations) {

DataFrame df = inputBean.getDataFrame();
String labelCol = inputBean.getLabelCol();
String [] inputCols = inputBean.getInputCols(); // multiple columns for regression
DataFrame newdf = df.select(labelCol, inputCols);

JavaRDD<LabeledPoint> parseddata = newdf.javaRDD().map(new Function<Row, LabeledPoint>() {
private static final long serialVersionUID = 1L;
public LabeledPoint call(Row r) throws Exception {
Object labVObj = r.get(0);
int colC = r.size();
double[] regv = new double[colC -1]; // -1 for first index

for (int i =1; i < colC; i++) {
Object regVarObj = r.get(i);
if (regVarObj != null)
regv[i-1] = (Double)regVarObj;
else
regv[i-1] = 0.0D; // Null replaced with 0.0
}
Vector regV = new DenseVector(regv);
if (labVObj != null ) {
return new LabeledPoint((Double) labVObj, regV);
} else {
return new LabeledPoint(0.0D, regV);
}}
});

// Building the model
return LinearRegressionWithSGD.train(parseddata.rdd(), numIterations);
}

5.) Use for null replacement

DataFrame newdf = dqu.replaceNull(inputBean,model.intercept(),model.weights().toArray()[0]);

6.) Function Map class

public class FunctionMap  implements java.io.Serializable, Function<Row,Row> {

private static final long serialVersionUID = 1L;
double _intercept, _weight;
public FunctionMap(double intercept, double weight) {
_intercept=intercept;
_weight = weight;
}

public Row call(Row r) throws Exception {
Double regv = r.getDouble(1);
if (r.get(0) == null && regv != null) {
double newVal = _intercept + _weight * regv;
return RowFactory.create(newVal, regv, r.getDouble(2));
} else
return r;
}
}
er1

Biggest Problem of Big Data – Entity Resolution

Big Data has gone past PoC phase. Different companies are at different stages of implementation. Data Ingestion, Data Storage ( Data Lake and EDW), Data Processing and Data Visualization processes have been quite mature and there are many open source and proprietary software to solve these problems.

One major hurdle Big Data faces today is – Entity Resolution ( defining a business entity form multitude of data sources). In EDW (Enterprise data warehouse) world, data were structured and sources were limited. Also keys of sources were pre and well defined . So RDBMS joins ( inner, outer, left outer, right outer, semi join etc) were enough to merge data from two different systems and tables.

In Big Data world, there is hardly any key or attribute that runs across the sources. Also keys of one system is useless as other systems are completely independent of each other. So business have to define their own logic for merging data from different sources which defines one entity. To make it worse, RDBMS kind of exact match joins should be replaced by fuzzy joins as referential integrity across systems can be ensured.

Following is a practical approach for resolving Entity Resolution:-

1.) Pre-define your entities and super set of  attributes ( coming from all data sources)

2.) Attributes may have multiple related values that should to mapped to same attributes. Plan your storage like this  ( Graph databases work fine for this relationship storage)

3.) Merge data from multiple sources using merge business logic to create a virtual entity with sizable attributes ( we used Apache spark for this )

Mapping attributes

Location

  • Zip, County, State, Lat/long
  • Nearby locations (+/- area)  – high propensity area
  • IP location

Time

  • Date/time stamp
  • Nearby time stamp (+/-) – Event happening before or after a period

Attributes

  • String match (Fuzzy) – Name, Address, Cause
  • Cardinal Match  – events sharing same or similar key
  • IP Correlation (Primary IP, Secondary IP, IP from same ZIP code)
  • Other business logic related merging rules

You can make you merge model Machine Learning based so it will be leaning over time to do a relevant merge.

4.) Right Sizing Merges Columns

  • Remove transaction columns
  • Remove  database columns
  • Remove technical identifiable columns
  • Remove duplicate Columns

5.) Search this entity in your relational graph database to find ranks of similar entities more than threshold. If the outcome is less than threshold then make a new entry in your entity table with the attributes of the searching entity.

 6.) Take the highest ranked entity and mapped the missing attributes from highest ranked entities. This entity is you final entity for business

How to enhance entities with changing attributes:

Like any practical entity, values of attributes keep changing. You map the attributes values of searching entity to highest ranked entity and see the different of values. let’s say the IP values of an entity is matching to its secondary value over time. Then the secondary IP value becomes primary and primary becomes secondary. Or it is new IP then add one more relationship node with new values.

Ranking algorithm:

Business can assign different weight-age to must have, critical, important and good to have attributes and their matching threshold. This model also matches with secondary or related values so make it more accurate. Let’s say, address is a must match attributes. A customer other attributes are matching but address not matching so other models will reject it but in this model if matches with his or her office address or other address, it will boost the record that is right.

datalake

How to create Data Lake

As Datalake is getting mainstream, obvious questions are – a.) how to make it & b.) will it work ?

The latter came mostly from the failed Business Intelligence project sponsors, who still feel the pain. ( Here is my blog about it –  https://viveksingh36.wordpress.com/2014/11/12/why-bi-projects-fail-and-the-role-of-data-architect/ )

Then they are further question about time and resources. Do I need to throw away my EDW ( enterprise data warehouse) ? Is Hadoop (big data) must for Datalake ? What are the tools to create DataLake ? ?

In this blog, I am trying to answers these questions from a practitioner perspective.

DataLake is evolution not disruption : As storage is getting cheaper, data practitioners thought of co-locating raw data along with processed data so that data engineers and data scientists  have quick access to raw data. This shift changes “Extraction” to “Ingestion” where data is loaded “AS IS”. This shift also bring some changes to ETL tools  – see here ( https://viveksingh36.wordpress.com/2015/01/08/data-lake-and-e-t-l-big-data-plugins/ )

But it does not require to throw away your existing ETL jobs and EDW. If designed carefully they can be reused in datalake. Also Hadoop is not must. Datalake can be created on RDBMS, noSQL, HDFS or any fileSystem. Hadoop/HDFS being cheapest, is the preferred choice but if you have unlimited corporate licences for any of the above, you can use it for dataLake.

How to build it : Divide you datalake into 4 logical and physical spaces.

datalake

i) Raw Space : ( It is combination of Extraction and Staging of traditional warehouse with state information)

a.) load data “as is”

b.) define folder structure for different sources

c.) Meta Information : Time of load, load volume, load time, load status

d.) Useful for Auditing and Data Lineage

ii) Qualified Space :( It is combination of Transformation and Joining of traditional warehouse with data dictionary)

a.) run data quality and Entity Resolution on “as is” data

b.) define folder structure for different partitions ( time based, region based, verticals based)

c.) Meta Information : Data Type, Expected values, manipulable,

d.) Useful for Insight and Prediction

iii) Data warehouse Space :( You can reuse existing EDW here)

a.) load recent data into EDW

b.) define reporting and discovery parameters

c.) Meta Information : Data Dictionary, Granularity and Latency

d.) Useful for operational reports and discovery

iv) Collaborative Space :( In this space you want to expose you data to 3rd party or fetch data from 3rd party )

https://viveksingh36.wordpress.com/2015/11/19/inbound-and-outbound-data-movement-from-data-lake/

a.) find out the data to be exposed

b.) define security and data format

c.) Meta Information : Data Dictionary, Aggregation level, data format

d.) Useful for data Monetization and Data Fusion

Conclusion : Above are the general guideline for creation of datalake. As every business is different, each datalake is different. Choice of tools, storage will vary according to your requirements. Though storage is cheap, processing of huge amount data will need lots of CPU and RAM – which is serious money. So be careful with volume of data. Though generally it is told; bring everything into dataLake – start small and see value before bringing everything.

Inbound and Outbound data movement from Data Lake

Description: Enterprise data is sitting in Enterprise Data Lake. Only internal employees have access to data for analysis and insight. This paper discusses conceptual framework, where 3rd party data can be brought in for “fusion” with enterprise data to find new insights or how, enterprises data can be made available to 3rd party for “monetization”.

Abstract: Businesses are in agreement that data sharing is good for eco-system. One hand it increases top-line by adding new revenue channel by monetizing data, on other hand, hitherto unknown data brings new insight. The problem is,

  • technology is not ready to infuse massive 3rd party data in automated way and
  • Data Lake can’t emit data to 3rd

There are actions items about information mapping, data curation, security and compliance, data movement from internal storage to public facing storage, which need to be fixed to enable the framework.

This paper discusses a conceptual framework (a process flow) which can be followed while creating inbound and outbound data flow framework on Data Lake.

Business Process Flow for Data Movement:

  • Information Mapping
  • Data Curation
  • Entity Resolution
  • Security and Compliance

 

 

  • Information Mapping :

a.) Business Entity discovery

b.) Business Attributes discovery

c.) Sharable attributes

d.)  Joinable attributes

e.) Aggregation level

f.) Time to share

g.) Time to fetch

  • Data Curation:

a.) Schema mapping

b.) Missing value replacement

c.) Dirty data drop

d.) Fuzzy Joining of data

e.) inbound and outbound dataset creation

  • Entity Resolution :

a.) Finding the business entities across data sets

b.) Finding attributes which can affect behavior of entities

c.) Logic of correlation

d.) Logic for join

  • Security & Compliance:

a.) Deletion of personal identifiable data

b.) Masking of critical data

c.) Validating compliance

Technical consideration:

a.) downloadable or API based,

b.) format of download

c.) choice of ETL tool

d.) choice of EAI tool

e.) internal storage Vs public facing storage

f.) scalability

Business consideration:

a.) What to expose

b.) Whom to expose, registered user or to public,

c.) What is monetization policy (by download, by advertisement or by API usages?)

 

Conclusion: Every business is unique. This paper tries to bring out a conceptual guideline on how to fusion 3rd party data into your system and monetize your data by providing to 3rd party. Companies are struggling to find out a way to monetize data and also bring more insight into data.

A data sharing ecosystem will be a big boost to companies.

Entity Resolution and Event Correlation – Datalake DQ

DQ ( Data Quality) historically started with missing values and then moved into address correction and data enrichment ( Geo Encoding, Standardization etc.) Data Quality tools have been successfully solved the traditional data quality problems; like the ones discussed above.

So far, DQ was single source and single domain. With the advent of data lake, DQ has to adopt to new strategy. Event Correlation and Entity Resolution are going to be crucial for data lake validation. DQ tools have to provide these 2 must features for data lake.

Entity Resolution : Data lake will hold data from multiple sources and domains. It would be critical to create right entities from the data set. Following will be prime components of Entity Resolution (ER)

a.) Fuzzy Join : we have so many joins ( inner, outer, left outer, semi , equi etc ) supported today but they match exact values. Dimension from multiple sources may not have exact match  ( like name or address). Fuzzy join will match values which are similar but may not match exactly – like John Smith and John Smithe

b.) Algorithm for picking dimension values : Datalake will contain data from multiple CRMs, domains. While matching dimensional values, there will conflict which one to pick – let say SalesForce has different address, Sales mart has different address, the data you bought have different address. The entity should have one master address. ER algorithm will pick the right value based on timeliness, validity of source, most common occurrence etc.

c.) Entity Classification: Once the Entity Unique id and master dimensions are identifies, next step involves classifying the entity using business rules. These entity may be outdated, inactive or have little relevance. Once entity is classified and tagged, it can be used for further analysis or can be put in historical datalake. An entity with missing critical dimensional value will be dumped in dirty datalake for further investigation.

Event Correlation:  Theoretically, event is also an entity but I am putting it different header because it is temporal in nature and the algorithms used for correlation events would be different.

a.) Range Bound Correlation : Hardly two correlated event will occur at same time. One event will fire another event which may lag in time or place or in both. Along with event identifier fields, range bound dimension will be used to correlate events. Business rules will decide the  width of boundary.

b.) Event aggregation : An event can fire many sub events and super events. All these events has to suppressed into one related event. Event Correlation (EC) algorithms will map all these events into related event and cause and bring into human readable format.

c.) Noise reduction:  Aggregated event may be a false event or noise. Business rules will decide will event should be carried forward ( assuming they have strong correlation with business )and which should be dropped. Events will also go through business classification to rank their importance.

Conclusion:  Datalake will bring new challenges to Data Quality which will go through transformation to solve new problems. DQ will move from :

i)Single Source   –> DataLake

ii)Structure Analysis –> Mapping Entity

iii) Operational —> Analysis