Data analysis with Apache Pig. A practical introduction

data-analysis hadoop pig

Apache Pig is a platform for analyzing large datasets. With Pig you have a higher level of abstraction than in MapReduce, so you can deal with richer structures of data but, in general, with a lower performance.

Pig can be run in local mode, a single JVM with the local filesystem, or MapReduce mode, creating MapReduce queries and running them on a Hadoop cluster. For the following examples it is irrelevant in which mode you run Pig.

Simple example with CSV records

Using NCDC weather datasets with hourly precipitations in american weather stations ( get the total precipitations per station and the total precipitations per date for a station.

Read the data (use your own path to the piggybank file and dataset):

-- use CSVExcelStorage from piggybank to read the csv skipping the header
REGISTER '/usr/local/pig/contrib/piggybank/java/piggybank.jar';
records = LOAD '/input/199804hpd.txt' 
    AS (wban:chararray, date:chararray, time:chararray, hp:float);

Analyze the data:

-- precipitations registered in each weather station
grouped_records = GROUP records BY wban;
sum_records = FOREACH grouped_records GENERATE group, SUM(records.hp);
DESCRIBE sum_records;
DUMP sum_records;

-- daily precipitations registered in weather station 93808
filtered_records = FILTER records BY wban == '93808'; 
grouped_records = GROUP filtered_records BY date;
sum_records = FOREACH grouped_records GENERATE group, SUM(filtered_records.hp);
DESCRIBE sum_records;
DUMP sum_records;

More complex dataset structure

Read a fixed-width-columns dataset with many columns of weather information and obtain some desired columns:

-- read file without schema
records = LOAD '/input/199803dailyavg.txt';

-- skip first row (header) and define the entire row as a schema
records = STREAM records THROUGH `tail -n +2` as (row:chararray);

-- get desired data spliting the row by index
records = FOREACH records GENERATE 
    SUBSTRING(row, 0, 5) AS wban, 
    SUBSTRING(row, 6, 12) AS year_month,
    (float)TRIM(SUBSTRING(row, 15, 20)) AS max_temp,
    (float)TRIM(SUBSTRING(row, 27, 31)) AS min_temp;

DESCRIBE records;
DUMP records;