With Hadoop Streaming API you can use any scripting language — Perl, Ruby, Python, etc. — as long as they understand STDIN / SDTOUT channels (they all do directly or via an I/O library). The corresponding runtimes of these languages must be present on all data nodes in your cluster to process the input data in parallel.
In this short blog post I will show you how to create the simplest possible Streaming MapReduce application that can run on the MapReduce (MR) engine, be it MRv1 or YARN (MRv2). The application consists of a single script that uses the cat and wc command-line tools shipped with any Unix-like OSes and therefore don’t require any installation.
You can test the script on any Cloudera Distro of Hadoop (CDH) or their QuickStart VM.
I used CDH 5.4.0 (which is reflected in the Hadoop streaming jar’s name.)
The script counts the number of words in the input file stored on HDFS and it is functionally similar to this UNIX command executed on a single machine:
cat your_input_file | wc -w
The your_input_file
file, in this case, is located on the local file system.
Steps:
- First, you will need to create a folder on HDFS; we will name it IN
hadoop fs -mkdir IN
- Upload the files you want to process to that folder. Let’s say we want to upload files with the extension txt sitting in the current working directory on your local file system.
hadoop fs -put *.txt IN
Note: You can also use this command to upload all files in a source directory on the local file system:
hadoop fs -put SomeSourceDirectory/* IN
OK, the files are in HDFS in the
/user/
directory./IN - Now, fire up your favorite editor and create a file named streamingCatWc.sh containing the following commands:
export SLIB=/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.4.0.jarhadoop jar $SLIB
-input IN
-output OUT
-mapper /bin/cat
-reducer "/usr/bin/wc -w"The name of the jar file in the script,
hadoop-streaming-2.6.0-cdh5.4.0.jar
, is specific for the CDH release I am using; the rest of the code is common and can be re-used with other versions as well.The OUT output folder on HDFS will be created automatically by Hadoop. If you already have this folder on HDFS, the script will throw an exception. In order to proceed, delete the folder using this command:
hadoop fs -rm -r -skipTrash OUT
The cat tool will act as a mapper by fetching your input files in chunks (64 or 128M in size, depending on your HDFS configuration); wc –w will act as a reducer, performing the word count operation. The mapper, in our case, will not emit the key/value pairs (word,1), rather, it will send the whole slice of the input files to wc.
- Make streamingCatWc.sh executable:
chmod u+x streamingCatWc.sh
- Run the script:
./streamingCatWc.sh
The big cogs of the MapReduce machinery will start slowly turning showing you the progress of your job on console. After a short while, you should see the success message:
… mapreduce.Job: Job job_
… streaming.StreamJob: Output directory: OUT
The processing results are placed in the OUT folder stored as a bunch of files named part-: part-00000, part-00001, etc.
You can use this command to view the results on console:
hadoop fs -cat OUT/part-* | less
By the way, with a trivial code modification you can use the above script to find the number of lines in the input files. This task is left as an exercise for the reader. Just don’t forget to delete the OUT HDFS folder before you re-run the script!