Wednesday, April 25, 2012

Secondary Sort using Python and MRJob

MRjob is an excellent module developed as open source project by Yelp.. I chosed mrjob because of the following features


  1. It provides a seamless JSON reader and writer (i.e the mapper can read json lines and convert them into   lists)
  2. We can test hadoop job locally (in windows or unix) on a small dataset without actually using huge hdfs files (quick !!)
  3. Can orchestrate many mappers and reducers in the same code 

My task is to parse json formatted web log files and parse them, say the columns are sessionid,stepno and data. so the psuedo-code
  1. Read the json files using mrjob protocol
        DEFAULT_INPUT_PROTOCOL = 'json_value'
        DEFAULT_OUTPUT_PROTOCOL = 'repr_value'  #  output is delimited

  2.  yield sessionid, (sessionid,stepno,data) from mapper
                      the Mapreduce will make sure that all sessionids(key) goes to same mapper.. with the remaining values sent as a dictionary (value) to make it easier for us to srt in reducer

 3.  Use sorted from itertools of python module to sort by stepnumber in reducer

 def reducer(self, sessionId, details):
                sdetail = sorted(details, key=lambda x: x[1])  # sorting by stepno for each session
                for d in sdetail:
                        line_data='\t'.join(str(n) for n in d)

We are doing the secondary sort to scan through each events as the sequence is very important to do the funnel analysis of logs..

Complete code :

import sys,time
#sys.path.append('/usr/lib/python2.4/site-packages/')
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from itertools import groupby
from operator import itemgetter, attrgetter

class uet(MRJob):
        DEFAULT_INPUT_PROTOCOL = 'json_value'
        DEFAULT_OUTPUT_PROTOCOL = 'repr_value'

        def mapper(self, _, line):
                        sessionId = line['sessionId']
                        data = line['data']
                        if len(sessionId) < 13:
                                for i in range(len(data)):
                                        no = data[i]['no']
                                        yield sessionId,(sessionId, no,data)

        def reducer(self, sessionId, details):
                sdetail = sorted(details, key=lambda x: x[1])  # sorting by stepno for each session
                for d in sdetail:
                        line_data='\t'.join(str(n) for n in d)
                        print str(line_data)


if __name__ == '__main__':
    uet.run()



Tuesday, April 17, 2012

Top NBA Players - by twitter followers

I really dont have any idea of how advertising companies decide on the price for certain celebrities. Because its very hard to measure the direct relationship to the sales and to derive an ROI. Also Im not sure if any celebrity with enormous following can make a big impact. I did data collection for fun to see who is actually more popular in Twitter and have more following. I used infochimps rest api calls to get the aggregated information and formatted files to make it readable by Tableau. (used Python for ETL).. seems like SHAQ even after out of the league has a greater fan following than active player.. I havent included Kobe and Rose.. and I tried my best to get the official ids of each player..


Tuesday, April 10, 2012

DW in Hive - handling big dimensions

This is a an issue bothering our reporting data quality. How to handle big dimension tables in Hive data warehouse. How to balance performance and data-quality..

Problem statement:
Currently dimension hive table (dim_customer) is partitioned by date. The daily incremental creates the new partition so that we can improve the performance at the reporting side. This poses 2 critical issues
1. slowly changing dimension is lost
2. Compromise in data-quality
3. Have to filter by the dimension table in the reporting

Solution:
The only way to solve this problem is to have the dimension table as one big Hive table instead of partitions. But this creates issues with the refresh strategy and overhead on reporting Hive query..

The following is a recipe to solve this block.. step.1 is certainly the priority

1. Increase processing power
Hadoop is not only about mega storage it is also about mega processing ..so if we process big files then we got to have good number of nodes. say we have 30 nodes to process partitioned dimension table..we have to move to 120 nodes for single dimension file strategy. Processing power is tripled - it is directly proportional !.

2. Use SQOOP merge
We cannot extract the whole table from transactional system every time..source transactional systems might not allow.. we can only capture the change data. SQOOP merge comes handy for this purpose. We can overwrite only the incremental records in the hive table (type 1 SCD). Again this is a Map reduce program.. we need processing power..

3. Use Bucketed Hive tables
Hive performance block comes while joining. We can create a table with bucketing.. like hashing index on the customer_id.


If the tables being joined are bucketized, and the buckets are a multiple of each other, the buckets can be joined with each other. If table A has 8 buckets are table B has 4 buckets, the following join

SELECT /*+ MAPJOIN(b) */ a.KEY, a.value
FROM a JOIN b ON a.KEY = b.KEY
can be done on the mapper only. Instead of fetching B completely for each mapper of A, only the required buckets are fetched. For the query above, the mapper processing bucket 1 for A will only fetch bucket 1 of B. It is not the default behavior, and is governed by the following parameter

Bucketing and Sqoop merge requires good planning and metadata management..
Managing Hadoop from the scratch is challenging as we bump into the limits sooner, we need to adapt quickly else data might grow beyond limits. One advantage( and complexity) is that the internal processing (mapreduce) is open and its upto the developer to improve. And the biggest advantage of all is scaling out.. you can add nodes easily to really make a difference..