Uncategorized

SparkSQL Example

Download and Import Into HDFS
%sh
if [ -e /tmp/au.dataset.downloaded ]
then
echo ‘Files already downloaded so skipping the download …’
exit 0;
fi

#remove existing copies of dataset from HDFS
hdfs dfs -rm /tmp/expenses.csv

#fetch the dataset
curl https://data.gov.au/dataset/f84b9baf-c1c1-437c-8c1e-654b2829848c/resource/88399d53-d55c-466c-8f4a-6cb965d24d6d/download/healthexpenditurebyareaandsource.csv -o /tmp/expenses.csv

#remove header
sed -i ‘1d’ /tmp/expenses.csv
#remove empty fields
sed -i “s/,,,,,//g” /tmp/expenses.csv
sed -i ‘/^\s*$/d’ /tmp/expenses.csv

#put data into HDFS
hdfs dfs -put /tmp/expenses.csv /tmp
hdfs dfs -ls -h /tmp/expenses.csv
if [ $? -eq 0 ];
then
rm /tmp/expenses.csv
touch /tmp/au.dataset.downloaded
fi

Read into RDD and Count

// Wait for data to be downloaded
import java.nio.file.{Paths, Files};
var counter = 0;
try {
while (counter < 50 && !Files.exists(Paths.get(“/tmp/au.dataset.downloaded”))) {
println(“sleeping for 100ms…”)
Thread.sleep(100);
counter = counter + 1;
}
}catch {
case ea: InterruptedException => print (“Thread interrupted”);
}
// Data downloaded
val dataset=sc.textFile(“/tmp/expenses.csv”)
dataset.count()
dataset.first()

Register RDD as Table

case class Health (year: String, state: String, category:String, funding_src1: String, funding_scr2: String, spending: Integer)
val health = dataset.map(k=>k.split(“,”)).map(
k => Health(k(0),k(1),k(2),k(3), k(4), k(5).toInt)
)
// toDF() works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use below instead:
// health.registerTempTable(“health_table”)
health.toDF().registerTempTable(“health_table”)

Spark – Read CSV, Infer Schema from Header

%spark
https://130.162.78.175:1080
val Directory = “weather”

sqlContext.setConf(“spark.sql.shuffle.partitions”, “4”)

//val df = sqlContext.read.format(“com.databricks.spark.csv”).option(“header”, “true”).load(“swift://”+Container+”.default/”+Directory+”/raw/201612-weather.csv”)
//We will use the bdfs (alluxio) cached file system to access our object store data…
val wdf = sqlContext.read.format(“com.databricks.spark.csv”).option(“header”, “true”).option(“inferSchema”,”true”).load(“bdfs://localhost:19998/”+Directory+”/raw/201612-weather.csv”)

// If you get this error message:
// java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
// Then go to the Settings tab, then click on Notebook. Then restart the Notebook. This will restart your SparkContext

println(“Here is the schema detected from the CSV”)
wdf.printSchema()
println(“..”)

println(“# of rows: %s”.format(
wdf.count()
))
println(“..”)

wdf.createOrReplaceTempView(“weather_temp”)
println(“done”)