In the previous blog, we looked at how find out the maximum temperature of each year from the weather dataset. Below is the code for the same using Spark SQL which is a layer on top of Spark. SQL on Spark was supported using Shark which is being replaced by Spark SQL. Here is a nice blog from DataBricks on the future of SQL on Spark.
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
#function to extract the data from the line
#based on position and filter out the invalid records
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("", q)):
return [(year, temp)]
logFile = "hdfs://localhost:9000/user/bigdatavm/input"
#Create Spark Context and SQL Context with the master details and the application name
sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
sqlContext = SQLContext(sc)
#Create an RDD from the input data in HDFS
weatherData = sc.textFile(logFile)
#Transform the data to extract/filter and then map it to a row
temperature_data = weatherData.flatMap(extractData).map(lambda p: Row(year=p, temperature=int(p)))
#Infer the schema, and register the SchemaRDD as a table.
temperature_data = sqlContext.inferSchema(temperature_data)
#SQL can be run over SchemaRDDs that have been registered as a table.
#Filtering can be done in the SQL using a where clause or in a py function as done in the extractData()
max_temperature_per_year = sqlContext.sql("SELECT year, MAX(temperature) FROM temperature_data GROUP BY year")
#Save the RDD back into HDFS
Not sure why but a lot of tasks are getting spawned and it is taking much more time than the previous program without using SQL. Also, max_temperature_per_year.count() is returning a proper value, but the output in HDFS has a lot of empty files along with the results in the some of the files. The total number of files equals to the number of tasks spawned in the executor.
Looks like a bug in Spark. If something has been missed in the above code, please let know in the comments and the code will be updated accordingly.