This tutorial is adapted from Web Age course Hadoop Programming on the Cloudera Platform.
In this tutorial, you will work through two functionally equivalent examples / demos – one written in Hive (v. 1.1) and the other written using PySpark API for the Spark SQL module (v. 1.6) – to see the differences between the command syntax of these popular Big Data processing systems. Both examples / demos have been prepared using CDH 5.13.0 and that’s what you are going to use in this exercise.
We will work against the same input file in both examples, running functionally equivalent query statements against that file.
Both Hive and PySpark shells support the Unix command-line short-cuts that allow you to quickly navigate along a single command line:
Ctrl-a Moves the cursor to the start of the line.
Ctrl-e Moves the cursor to the end of the line.
Ctrl-k Deletes (kills) the line contents to the right of the cursor.
Ctrl-u Deletes the line contents to the left of cursor.
Part 1 – Connect to the Environment
- Download and install Cloudera’s Quickstart VM. Follow this link for instructions.
Part 2 – Setting up the Working Environment
All the steps in this tutorial will be performed in the /home/cloudera/Works directory. Create this direectory if it is not existing.
1. In a new terminal window, type in the following command:
cd ~/Works
Before we begin, we need to make sure that Hive-related services are up and running.
2. Enter the following commands one after another:
sudo /etc/init.d/hive-metastore status
sudo /etc/init.d/hive-server2 status
If you see their status as not running, start the stopped service(s) using the following commands:
sudo /etc/init.d/hive-metastore start sudo /etc/init.d/hive-server2 start
3. Enter the following command:
hive --version
You should see the following output:
Hive 1.1.0-cdh5.13.0
Subversion file:///data/jenkins/workspace/generic-package-rhel64-6-0/topdir/BUILD/hive-1.1.0-cdh5.13.0 -r Unknown
Compiled by jenkins on Wed Oct 4 11:06:55 PDT 2017
From source with checksum 4c9678e964cc1d15a0190a0a1867a837
4. Enter the following command to download the input file:
wget 'http://bit.ly/36fGR32' -O files.csv
The file is in the CSV format with a header row; it has the following comma-delimited fields:
1. File name 2. File size 3. Month of creation 4. Day of creation
And the first four rows of the file are:
FNAME,FSIZE,MONTH,DAY a2p,112200,Feb,21 abrt-action-analyze-backtrace,13896,Feb,22 abrt-action-analyze-c,12312,Feb,22
5. Enter the following commands to put the file on HDFS:
hadoop fs -mkdir hive_demo hadoop fs -put files.csv hive_demo/
The file is now in the hive_demo directory on HDFS – that’s where we are going to load it from when working with both Hive and Spark.
Part 3 – The Hive Example / Demo
We will use the hive tool to start the interactive Hive shell (REPL) instead of the now recommended beeline tool which is a bit more ceremonial and here we are going to ignore its client-server architecture advantages in favor of the simplicity of hive.
1. Enter the following command to Start Hive REPL:
hive
Note: You start the Beeline shell in embedded mode by running this command:
beeline -u jdbc:hive2://
To connect to a remote Hive Server (that’s is the primary feature of Beeline, use this command:
beeline -n hive -u jdbc:hive2://<hive server IP>:<port>,
e.g beeline -n hive -u jdbc:hive2://quickstart:10000
Now, let’s create a table using the Hive Data Definition Language (DDL).
2. Enter the following command:
CREATE EXTERNAL TABLE xfiles (fileName STRING,fileSize INT,month STRING,day INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/cloudera/hive_demo/' tblproperties ("skip.header.line.count"="1");
Note that we have created it an external Hive table.
3. Enter the following command to print the basic table schema info:
describe xfiles;
You should see the following output:
filename string
filesize int
month string
day int
4. Enter the following command to print the extended table metadata:
desc formatted xfiles;
You should see the following output:
# col_name data_type comment
filename string
filesize int
month string
day int
# Detailed Table Information
Database: default
Owner: cloudera
CreateTime: Mon Feb 24 07:35:26 PST 2020
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://quickstart.cloudera:8020/user/cloudera/hive_demo
Table Type: EXTERNAL_TABLE
Table Parameters:
EXTERNAL TRUE
skip.header.line.count 1
transient_lastDdlTime 1582558526
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
field.delim ,
serialization.format ,
5. Enter the following command to fetch the first ten rows of the table:
select * from xfiles limit 10;
6. Enter the following basic SQL command:
select month, count(*) from xfiles group by month;
You should see the following output:
Apr 305
Aug 93
Dec 103
Feb 279
Jan 5
Jul 63
Jun 129
Mar 2
May 5
Nov 221
Oct 9
Sep 66
7. Enter the following more advanced SQL command:
select month, count(*) as fc from xfiles group by month having fc > 100;
You should see the following output:
Apr 305
Dec 103
Feb 279
Jun 129
Nov 221
8. Enter the following Hive DDL and DML commands (we are creating and populating a table in PARQUET columnar store format):
CREATE TABLE xfiles_prq LIKE xfiles STORED AS PARQUET; INSERT OVERWRITE TABLE xfiles_prq SELECT * FROM xfiles;
9. Enter the following command:
desc formatted xfiles_prq;
Locate in the output of the above command the location of the xfiles_prq Hive managed table; it should be:
hdfs://quickstart.cloudera:8020/user/hive/warehouse/xfiles_prq
It is a directory that contains a file in the PARQUET format:
/user/hive/warehouse/xfiles_prq/000000_0
Part 4 – The Spark Example / Demo
1. Start a new terminal and navigate to ~/Works
2. Start a new PySpark session
Notice that PySpark v.1.6 uses Python v.2.6.6.
3. Enter the following command:
xfiles = sqlContext.read.load('/user/hive/warehouse/xfiles_prq/')
We are reading the file in the PARQUET format, which is the default for the load method.
4. Enter the following command:
type(xfiles)
You should see the following output:
<class 'pyspark.sql.dataframe.DataFrame'>
5. Enter the following command to print the xfiles‘s schema (it is somewhat equivalent to Hive’s describe xfiles command):
xfiles.printSchema()
You should see the following output (notice that Spark accurately recreated the schema of the file. From where do you think it got the clues?)
root |-- filename: string (nullable = true) |-- filesize: integer (nullable = true) |-- month: string (nullable = true) |-- day: integer (nullable = true)
Now we will repeat all the queries from the Hive shell lab part.
In PySpark you have two primary options for querying structured data:
-
using the sqlContext’s sql method, or
-
using the Spark DataFrame API
we will illustrate both.
To use the sqlContext’s sql method, you need to first register the DataFrame as a temp table that is associated with the active sqlContext; this temp table’s lifetime is tied to that of your current Spark session.
6. Enter the following command:
xfiles.registerTempTable('xfiles_tmp')
7. Enter the following command to fetch the first ten rows of the table:
sqlContext.sql('select * from xfiles_tmp').show(10)
You should see the following output (abridged for space below):
+--------------------+--------+-----+---+
| filename|filesize|month|day|
+--------------------+--------+-----+---+
| a2p| 112200| Feb| 21|
|abrt-action-analy...| 13896| Feb| 22|
|abrt-action-analy...| 12312| Feb| 22|
|abrt-action-analy...| 6676| Feb| 22|
|abrt-action-analy...| 10720| Feb| 22|
|abrt-action-analy...| 11016| Feb| 22|
. . .
8. Enter the following command:
xfiles.select ('*').show(10)
9. Enter the following command:
sqlContext.sql('select month, count(*) from xfiles group by month').show(10)
You should see the following output:
+-----+---+
|month|_c1|
+-----+---+
| Jul| 63|
| Jun|129|
| Apr|305|
| Feb|279|
| Oct| 9|
| Nov|221|
| Mar| 2|
| May| 5|
| Dec|103|
| Aug| 93|
+-----+---+
10. Enter the following command:
xfiles.groupBy('month').count().show(10)
You should see similar to the above output:
+-----+-----+
|month|count|
+-----+-----+
| Jul| 63|
| Jun| 129|
| Apr| 305|
| Feb| 279|
| Oct| 9|
| Nov| 221|
| Mar| 2|
| May| 5|
| Dec| 103|
| Aug| 93|
+-----+-----+
11. Enter the following command:
sqlContext.sql('select month, count(*) as fc from xfiles group by month having fc > 100').show()
You should see the following output:
+-----+---+
|month| fc|
+-----+---+
| Jun|129|
| Apr|305|
| Feb|279|
| Nov|221|
| Dec|103|
+-----+---+
12. Enter the following command:
xfiles.groupBy('month').count()
.rdd.filter(lambda r: r > 100)
.toDF().show()
You should see the output matching the one generated by the previous command.
Alternatively, you can use this more verbose but more Pythonian type of command:
map(lambda r: (r, r), filter(lambda r: r > 100, xfiles.groupBy(‘month’).count().collect()) )
The output will be a bit more difficult to read, though:
Part 5 – Creating a Spark DataFrame from Scratch
You can create a Spark DataFrame from raw data sitting in memory and have Spark infer the schema from the data itself. The choices for column types are somewhat limited: PySpark does not support dates or booleans, but in most practical cases what PySpark supports is more than enough.
1. Enter the following commands to create a list of tuples (records) representing our data (we simulate data coming form different sources here):
txnids = items = dates = prices = records =
2. Enter the following command to create a list of column names:
col_names =
3. Enter the following command to create a DataFrame:
df = sqlContext.createDataFrame(records, col_names)
If you print the DataFrame’s schema and its data, you will see these outputs:
root |-- txnid: long (nullable = true) |-- Date: string (nullable = true) |-- Item: string (nullable = true) |-- Price: double (nullable = true +-----+----------+----+------+ |txnid| Date|Item| Price| +-----+----------+----+------+ | 1|2020-02-03| A|123.99| | 2|2020-02-10| B| 3.5| | 3|2020-02-17| C| 45.67| +-----+----------+----+------+
You can apply date-time related transformations using the functions from the pyspark.sql.functions module.
4. Enter the following commands:
from pyspark.sql.functions import date_format df2 = df.select (df.txnid, df.Item, date_format('Date', 'MM-dd-yyyy').alias('usdate'))
This command is functionally equivalent to the INSERT INTO …. SELECT … FROM SQL command data transfer idiom. We also apply the date formatting function date_format().
The df2 DataFrame has the following data:
+-----+----+----------+ |txnid|Item| usdate| +-----+----+----------+ | 1| A|02-03-2020| | 2| B|02-10-2020| | 3| C|02-17-2020| +-----+----+----------+