Wednesday, April 25, 2012

Secondary Sort using Python and MRJob

MRjob is an excellent module developed as open source project by Yelp.. I chosed mrjob because of the following features


  1. It provides a seamless JSON reader and writer (i.e the mapper can read json lines and convert them into   lists)
  2. We can test hadoop job locally (in windows or unix) on a small dataset without actually using huge hdfs files (quick !!)
  3. Can orchestrate many mappers and reducers in the same code 

My task is to parse json formatted web log files and parse them, say the columns are sessionid,stepno and data. so the psuedo-code
  1. Read the json files using mrjob protocol
        DEFAULT_INPUT_PROTOCOL = 'json_value'
        DEFAULT_OUTPUT_PROTOCOL = 'repr_value'  #  output is delimited

  2.  yield sessionid, (sessionid,stepno,data) from mapper
                      the Mapreduce will make sure that all sessionids(key) goes to same mapper.. with the remaining values sent as a dictionary (value) to make it easier for us to srt in reducer

 3.  Use sorted from itertools of python module to sort by stepnumber in reducer

 def reducer(self, sessionId, details):
                sdetail = sorted(details, key=lambda x: x[1])  # sorting by stepno for each session
                for d in sdetail:
                        line_data='\t'.join(str(n) for n in d)

We are doing the secondary sort to scan through each events as the sequence is very important to do the funnel analysis of logs..

Complete code :

import sys,time
#sys.path.append('/usr/lib/python2.4/site-packages/')
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from itertools import groupby
from operator import itemgetter, attrgetter

class uet(MRJob):
        DEFAULT_INPUT_PROTOCOL = 'json_value'
        DEFAULT_OUTPUT_PROTOCOL = 'repr_value'

        def mapper(self, _, line):
                        sessionId = line['sessionId']
                        data = line['data']
                        if len(sessionId) < 13:
                                for i in range(len(data)):
                                        no = data[i]['no']
                                        yield sessionId,(sessionId, no,data)

        def reducer(self, sessionId, details):
                sdetail = sorted(details, key=lambda x: x[1])  # sorting by stepno for each session
                for d in sdetail:
                        line_data='\t'.join(str(n) for n in d)
                        print str(line_data)


if __name__ == '__main__':
    uet.run()



No comments:

Post a Comment