Sunday, July 12, 2015

Metrics on the fly – with Hadoop and Tableau

Let’s take a use case, customer segmentation. How do we segment our customer base? - just accumulate whatever dimensions and metrics you can find for that user and store on the granularity of customer, then put a reporting layer on top of it for the Analyst to create reports and visualizations.

There are numerous ways to do aggregations - one way is to build Aggregates on top of a star schema. This approach might be costlier to process on a Databases (ELT) or in a ETL tool, especially if our customer base is huge.

Some challenges in building aggregates:
1. Required to do many full table scans - eg # transactions by user, #logins by user
2. The requirement for these metrics changes frequently - we have to make frequent DDL and ETL changes 

How about using Hadoop? Hadoop/Hive is good in handling huge datasets, but bad in interactivity and ease of use, where Tableau excels. Let’s discuss how to marry these two technologies to create metric framework where we can add metrics on the fly and actually do data analysis. Assume the required base-tables for deriving dimensions and calculating metrics are injected (using sqoop) into Hadoop and available as Hive tables.

The idea is to create a metrics (final fact) table with a id columns and one metrics column(map datatype) which bags all the metrics. Since Metrics column is a Map datatype, it can hold column names and corresponding metrics values. In this way, we are not constrained to defined columns, we can add as many metrics as we go.

CREATE EXTERNAL TABLE `customer_metrics`(
  `customer_id` int,
  `metrics` map<string,string>)

Data Model

1.       Determine the granularity of the metrics table - In our case its customerId. (can be multiple columns too)
2.       Write hive queries to get metrics on the specified granularity. Just create separate hive queries - a query for one or more metrics. All these queries should have same granularity. (Login_attempts,Last_login_date,Transaction_count)
3.       Create a staging table with 3 columns( id,key and value)
CREATE EXTERNAL TABLE `stg_customer_metrics`(
  `customerId` int,
  `key` string,
  `value` string)
4.       Create the target metrics table 
CREATE EXTERNAL TABLE `customer_metrics`(
  `customer_id` int,
  `metrics` map<string,string>)

ETL framework:

Now, to create a scalable ETL framework in order to add metrics on the fly. The Metrics query is going to be a union of any number of subqueries. Each subquery should follow the standards
1.       Should include columns of granularity and an array of key value pairs.
2.       Each key value pair should be defined in the format ‘metric_name’=metric_value
3.       This can be achieved by concatenating a static string to the aggregated value field.
4.       The result will be a union of all multiple subqueries with grain columns and array of key value pairs.
5.       Convert array column into multiple rows using hive explode function
6.       Split the key value pairs into two columns key and value.
7.       This entire query should be loaded in to the staging table created earlier.

INSERT OVERWRITE TABLE stg_customer_metrics
Select
 customer_id as customer_id,
 split(keyvalue,'=')[0] as key,
 split(keyvalue,'=')[1] as value
    from (

 select u.customer_id, /* METRICS 1&2 - LOGIN ATTEMPTS AND LAST LOGIN DATE*/
             array(   
             concat_ws('=','Login_attempts',cast(count (DISTINCT(case when i.CSTMR_STA_NM = 'LOGIN'
             then i.CSTMR_ID End)) as STRING)),
             concat_ws('=','Last_login_date',cast(Max(i.chg_dt) as STRING))) as mp
                    from customer_login
            group by u.customer_id

UNION ALL

select u.customer_id,  /* METRIC 3: TO GET TRANSACTION COUNT */
array(
concat_ws('=','Transaction_count',cast(count(distinct p.PYMT_ID) as STRING))
) as mp
from user_transaction
group by u.customer_id  /* GRANULARITY: CUSTOMER_ID */

) b
LATERAL VIEW explode(mp) mptable as keyvalue
where 1=1
;
The stage table may look like..
Customer_id
Key
Value
1123
Login_attempts
10
1123
Last_login_date
2015-07-07
1123
Transaction_count
2
9989
Login_attempts
20
9989
Last_login_date
2015-06-01
9989
Transaction_count
5
To add new metrics, create a select query and append to the union at staging table load. 

Load to final metrics table


ADD JAR udf-0.0.1-SNAPSHOT.jar;
INSERT OVERWRITE TABLE customer_metrics
select customer_id,UNION_MAP(MAP(key,value)) from stg_customer_metrics
group by customer_id;
Customer_id
Metrics
1123
{" Login_attempts":”10”," Last_login_date ":"2015-07-07"," Transaction_count ":"2"}
9989
{" Login_attempts":”20”," Last_login_date ":"2015-06-01"," Transaction_count ":"5"}

View:

Create a view on top of this final table. It helps in
1.     Creating a reference metadata
2.     Abstracts the complexity of selecting map data-type and changes happening to metrics table (you are free to add new metrics without impacting reporting layer)
3.     Easy drag and drop in Tableau reporting layer
4.     Different views for different user-groups

CREATE VIEW `vw_customer_metrics` AS
select `customer_id`
,`metrics`[" Login_attempts"] as ` Login_attempts`
,`metrics`[" Last_login_date"] as ` Last_login_date`
,`metrics`[" Transaction_count"] as ` Transaction_count`
From customer_metrics;

Reporting layer (tableau):

Pull this view into Tableau layer as an extract, so that the report will be faster and interactive. You can add new calculations on tableau layer especially for non-additive facts.
(If you are new to tableau, I strongly suggest to download Tableau public at : https://public.tableau.com/s/  and checkout the free learning videos on: http://www.tableau.com/learn/training  for anyone who works on data)
Bringing this metrics to tableau layer provides the following advantages which Hadoop layer cannot provide
1.    Interactive (faster response)
2.    Slicing and dicing ( by not invoking MapReduce every time)
3.    Blending with other data sets (quickly! Without IT J)
4.    Create calculated columns
5.    Visualization (Ofcourse!)

Conclusion:


We designed this idea with the mindset “Business logic at the cost of resources” – implementing the business logic is more important. We need to also think about file-formats, compression and incremental logic depending on specific use-case. Tableau layer is not effective on huge detailed data, aggregating dataset and reducing the rows (and size) matters a lot during data-analysis. Pushing down the business logic to Hadoop layer abstracts the complexity and reduce clutter at Tableau layer.