MRjob is an excellent module developed as open source project by Yelp.. I chosed mrjob because of the following features
- It provides a seamless JSON reader and writer (i.e the mapper can read json lines and convert them into lists)
- We can test hadoop job locally (in windows or unix) on a small dataset without actually using huge hdfs files (quick !!)
- Can orchestrate many mappers and reducers in the same code
My task is to parse json formatted web log files and parse them, say the columns are sessionid,stepno and data. so the psuedo-code
- Read the json files using mrjob protocol
DEFAULT_INPUT_PROTOCOL = 'json_value'
DEFAULT_OUTPUT_PROTOCOL = 'repr_value' # output is delimited
2. yield sessionid, (sessionid,stepno,data) from mapper
the Mapreduce will make sure that all sessionids(key) goes to same mapper.. with the remaining values sent as a dictionary (value) to make it easier for us to srt in reducer
3. Use sorted from itertools of python module to sort by stepnumber in reducer
def reducer(self, sessionId, details):
sdetail = sorted(details, key=lambda x: x[1]) # sorting by stepno for each session
for d in sdetail:
line_data='\t'.join(str(n) for n in d)
We are doing the secondary sort to scan through each events as the sequence is very important to do the funnel analysis of logs..
Complete code :
import sys,time
#sys.path.append('/usr/lib/python2.4/site-packages/')
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from itertools import groupby
from operator import itemgetter, attrgetter
class uet(MRJob):
DEFAULT_INPUT_PROTOCOL = 'json_value'
DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
def mapper(self, _, line):
sessionId = line['sessionId']
data = line['data']
if len(sessionId) < 13:
for i in range(len(data)):
no = data[i]['no']
yield sessionId,(sessionId, no,data)
def reducer(self, sessionId, details):
sdetail = sorted(details, key=lambda x: x[1]) # sorting by stepno for each session
for d in sdetail:
line_data='\t'.join(str(n) for n in d)
print str(line_data)
if __name__ == '__main__':
uet.run()
No comments:
Post a Comment