Tag Archives: example

Using Hadoop Streaming programs with Spark Pipes

Hadoop provides bindings(API) for only Java. To implement MapReduce programs in other languages Hadoop Streaming can be used. Any language which can R/W to standard streams can be programmed against Hadoop. It’s good that we can write MapReduce programs in non-Java languages based on our requirement, but the catch is that there is a bit of an overhead of launching an additional process for each block of data and also due to the inter process communication.

In any case, the non-Java program reads data from the Standard Input, does processing of the data and finally writes to the Standard Output. Spark also has a similar concept of integrating external programs which R/W to STDIO. So, any streaming program which has been developed for Hadoop/MapReduce can be easily used with minimal or no changes with Spark by using the pyspark.rdd.RDD.pipe() API in case of Python.hadoop spark streamingThe max_temperature_map.rb program from the Hadoop – The Definitive Guide book can be integrated with the Spark program to find the maximum temperature for an year as shown in the below code snippet. The max_temperature_map.rb takes a line from the RDD as an input, extract the different fields of interest and then only return the valid records to create a new RDD.

Conclusion

Big Data frameworks provide bindings mostly for Java, since most of them are developed in Java. Integration of Big Data frameworks with multiple languages is really important.  Writing extensions in other languages is not possible or is a bit of pain in most of the cases. All Hadoop Streaming programs can be integrated with Spark with minimal or no changes.

Spark Accumulators vs Hadoop Counters

In the previous blog, we looked at how to find the maximum temperature for a particular year using Spark/Python. Now, we will extend the program to find the number of valid and invalid records using accumulators in the weather data set. Spark accumulators are similar to Hadoop counters and can be used to count the number of times an event happens during the job execution.

During a sunny day scenario, the RDD records are processed only once. But, during a failure scenario the same RDD record might be processed multiple times and so the accumulator is updated multiple times for the same record. This might lead to incorrect value of the accumulator. The same is the case with Hadoop also, a block of data might be processed multiple times either due to some failure or due to speculative execution and the counter might be incremented for the same record multiple times. Not exactly sure if this failure scenario is handled in Spark and Hadoop.

map vs flatMap in Spark

In the previous blogs around Spark examples, RDD.flatMap() has been used. In this blog we will look at the differences between RDD.map() and RDD.flatMap().

map and flatMap are similar, in the sense they take a line from the input RDD and apply a function on it. The way they differ is that the function in map returns only one element, while function in flatMap can return a list of elements (0 or more) as an iterator.

Also, the output of the flatMap is flattened. Although the function in flatMap returns a list of elements, the flatMap returns an RDD which has all the elements from the list in a flat way (not a list).

Sounds a bit confusing. In the below code snippet, on the input lines both map and flatMap are applied and output dumped in HDFS to wordsWithMap and wordsWithFlatMap folder.

The output of the map function in HDFSmap-output
The output of the flatMap function in HDFSflatMap-output

Conclusion

The input function to map returns a single element, while the flatMap returns a list of elements (0 or more). And also, the output of the flatMap is flattened.

In the case of word count, where the input line is split into multiple words, flatMap can be used. Also, in the case of weather data set, the extractData nethod will validate the record and might or might not return a value. In this case also, flatMap can be used.

Finding the top 3 movies using Spark/Python

Given the movie and a movie rating data in the below format and the scaled down sample data sets for the movierating and movie. The Spark program aggregates the movie ratings and gives the top 3 movies by name. For this to happen aggregation had to happen on the movierating data and a join has to be done with the movie data.

Note that  MapReduce model doesn’t support the join primitive, so join has to be forcefully fitted in the map and reduce primitives. But, Spark supports join and other high level primitives which make it easier to write Spark programs.

movie
id (pk) | name | year

movierating
userid | movieid (fk) | rating

The Spark program is a bit complex when compared to the previous programs mentioned in this blog, but can be easily understood by going through the comments and the Spark Python API documentation for the same.

Finally, here is the output from the above Spark program in HDFS. movie-db-output

Maximum temperature for year using Spark SQL

In the previous blog, we looked at how find out the maximum temperature of each year from the weather dataset. Below is the code for the same using Spark SQL which is a layer on top of Spark. SQL on Spark was supported using Shark which is being replaced by Spark SQL. Here is a nice blog from DataBricks on the future of SQL on Spark.

Not sure why but a lot of tasks are getting spawned and it is taking much more time than the previous program without using SQL. Also, max_temperature_per_year.count() is returning a proper value, but the output in HDFS has a lot of empty files along with the results in the some of the files. The total number of files equals to the number of tasks spawned in the executor.

Looks like a bug in Spark. If something has been missed in the above code, please let know in the comments and the code will be updated accordingly.