You will remember that checkpointing is a process of truncating an RDD’s lineage graph and saving its materialized version on a persistence store.
First off, we need to designate the checkpoint directory location.
You can use HDFS or similar distributed persistence store supported by Spark and accessible by all worker nodes in the cluster; below I use HDFS.
__14. Enter the following commands (again, I skip some unnecessary details here) val checkpointDir = "hdfs://{Name Node IP: Port}/{directory location there}" sc.setCheckpointDir (checkpointDir)
The sc object is the reference to the SparkContext object.
We can only checkpoint an unmaterialized RDD (the one against which no action has been executed yet.)
__15. Enter the following command (we are continuing to use the over500 RDD from the previous post): val between500and600 = over500.filter (line => line(5).toInt < 600)
The between500and600 RDD is to hold a range of records where the value of the 6th column is between 500 and 600.
We basically apply another transformation to extend over500‘s lineage graph that we are going to cut down in a moment.
__16. You can print the lineage graph by running this command: between500and600.toDebugString
Now we can see what checkpointing can do for us.
__17. Enter the following command: between500and600.checkpoint()
Nothing is really happening to our RDD as yet as this command only injects a checkpoint marker into the RDD lineage graph.
Now let’s materialize our RDD by applying an action and have it persisted in the checkpoint location.
__18. Enter the following command: between500and600.count()
You should see a printed message confirming that checkpointing did happen:
... rdd.RDDCheckpointData: Done checkpointing ...
__19. Now, if you dump our RDD's lineage graph, between500and600.toDebugString
you should see that the lineage graph was truncated all the way up to the checkpoint designated by between500and600.checkpoint().
At this point, every worker node on the cluster involved in the job would have its portion of the between500and600 RDD persisted on HDFS in the checkpoint directory as
part-XXXXX
fragments.
But our old over500 RDD (which we worked off of to produce between500and600) still has the full lineage (use the
over500.toDebugString
command to that). We just severed the link between over500 and between500and600.
That’s all there is to know about checkpointing.
You can shut down you Spark REPL session.
Till next time!