Hadoop Streaming. Practical introduction to MapReduce with Python

data-processing python hadoop mapreduce

Apache Hadoop is a framework for distributed storage and processing. I'm not going to explain how Hadoop modules work or to describe the Hadoop ecosystem, since there are a lot of really good resources that you can easily find in the form of blog entries, papers, books or videos.

Hadoop is written in Java, however, for these two MapReduce examples I'm going to use Python for the mapper and reducer functions. You can use any language that can read and write standard input and outputs for the Hadoop Streaming.

Maximum temperature

Obtain the maximum temperature of each day of 1998.

I'm going to use some weather data from NCDC. Without any reason I've chosen the daily records of 1998. A month is represented per file and I'm going to focus on the date (second column) and maximum temperature (third column) to get the maximum temperature according to the different weather stations (wban - first column).

You can download the data at http://www.ncdc.noaa.gov/orders/qclcd/.

Create sample data to test:

% head -n 100 199801daily.txt >> sample.txt

Set executable permissions:

% chmod +x mapper.py reducer.py

Mapper function:

#!/usr/bin/env python

import sys


for line in sys.stdin:
    if not line.strip() or line.startswith('Wban'):
        continue
    _, k, v = line[:17].split(',', 2)
    try:
        v = int(v)
    except ValueError:
        continue
    print '%s\t%s' % (k, v)

Reducer function:

#!/usr/bin/env python

import sys


last_k = None
max_v = 0

for line in sys.stdin:
    k, v = line.strip().split('\t')
    v = int(v)

    if last_k == k:
        max_v = max(max_v, v)
    else:
        if last_k:
            print '%s\t%s' % (last_k, max_v)
        last_k = k
        max_v = v

if last_k == k:
    print '%s\t%s' % (last_k, max_v)

Test:

% cat sample.txt | ./mapper.py  | sort | ./reducer.py 

Copy data from local filesystem to HDFS:

% hadoop fs -put 1998*.txt /user/$(whoami)/input/

Map and reduce with Hadoop:

% hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -input input/1998*.txt \
  -output output \
  -mapper /opt/hadoop/maximum_temperature/mapper.py \ 
  -reducer /opt/hadoop/maximum_temperature/reducer.py

See the results:

% hadoop fs -cat output/*

[...]
19981222    94
19981223    92
19981224    91
19981225    91
19981226    90
19981227    91
19981228    90
19981229    90
19981230    89
19981231    89

Word counter

Count the words of a book.

From a book in text file I'm going to count the times a word is repeated. This is a simple example, so some issues won't be covered (like word contractions).

The book I've chosen is The Trial and you can download it from http://www.gutenberg.org/ebooks/7849

Create sample data:

% head -n 100 the_trial__franz_kafka.txt >> sample.txt

Set executable permissions:

% chmod +x mapper.py reducer.py

Mapper function:

#!/usr/bin/env python

import re
import sys


started = False

for line in sys.stdin:
    if started:
        if line.startswith('*** END OF THIS PROJECT'):
            break
        # Filter out some punctuation marks and set to lowercase.
        line = re.sub(r'["?!.,;:()-]', '', line).strip().lower()
        for word in line.split():
            print '%s\t1' % word
    elif line.startswith('*** START OF THIS PROJECT'):
        started = True

Reducer function:

#!/usr/bin/env python

import sys


last_k = None
last_v = 0

for line in sys.stdin:
    k, v = line.strip().split('\t')
    v = int(v)

    if last_k == k:
        last_v += v
    else:
        if last_k:
            print '%s\t%s' % (last_k, last_v)
        last_k = k
        last_v = v

if last_k == k:
    print '%s\t%s' % (last_k, last_v)

Test:

% cat sample.txt | ./mapper.py  | sort | ./reducer.py 

Copy data from local filesystem to HDFS:

% hadoop fs -put the_trial__franz_kafka.txt /user/$(whoami)/input/

Map and reduce with Hadoop:

% hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -input input/the_trial__franz_kafka.txt \
  -output output \
  -mapper /opt/hadoop/word_count/mapper.py \ 
  -reducer /opt/hadoop/word_count/reducer.py

See the results:

% hadoop fs -cat output/*

[...]
you'll    12
you're    67
you've    37
young    30
younger    1
your        94
yours    10
yourself        23
yourselves    3
youth    4