Tuesday, July 4, 2017

Spark Dataframes and Datasets


Although RDDs are conceptually simple, all new optimizations are coming from DataFrames and Datasets. As I upgrade my software from RDDs, these are the notes I've made.

Parquet

Apache Parquet is a columnar storage format. It can sometimes lead to improved performance, particularly with large data sets. By using a schema and by Parquet storing each column in its own file, queries over 400mb of data can take just one second.

You can covert CSV to Parquet with something like:

spark
  .read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvInPath)
  .write.mode(saveMode)
  .parquet(parquetPath)

DataFrames

We can now read this in with something like:

val df = spark.read.parquet(parquetPath)
df.withColumn("cus_id_no", $"cus_id_no".cast("bigint"))
  .filter($"prim_acc_no".isNotNull)
  .na.fill(0) // fill null numeric
  .na.fill("")// fill null string

Here, we're also filtering and providing defaults.

DataSets

DataSets are type safe. They can be derived from DataFrames with something like this:

headerDS = headerDF.withColumn("COLUMN", UDF).withColumn( .... ).as[MyCaseClass]

and manipulated with something like this:

val ds        = df.as[CaseClass]
val groupedBy = ds.groupByKey(_.x)
val joined    = ds.joinWith(groupedBy, ds("cus_id_no") === groupedBy("_1"), "left_outer")

DataFrames are just of type Dataset[Row] - see the type alias in the package object of org.apache.spark.sql:

type DataFrame = Dataset[Row]

Tungsten

"By avoiding the memory and GC overhead of regular Java objects, Tungsten is able to process larger data sets than the same hand-written aggregations" (from here).

Among Tungsten's clever features, it:

1. does not deserialize the whole object (very useful in a joinBy etc)

2. manages objects off-heap (see here).

Catalyst

"At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasiquotes) in a novel way to build an extensible query optimizer." from here.

The bad news

With these APIs, you have to accept a much less rich interface. For instance, groupByKey returns a KeyValueGroupedDataset which has a limited set of functions (for instance, there is no filter and mapping to any type that doesn't have an org.apache.spark.sql.Encoder associated with it leads to a compile-time error of "... Support for serializing other types will be added in future releases")

You can always convert them to RDDs but then you lose all optimization benefits.

The difference

This takes 1.2 hours on Orbis data:

headerDS.rdd.map(x => x.BVDIDnumber -> Seq(x)).reduceByKey(_++_).filter(_._2.size>1).take(10)

whereas this takes 5 mins:

headerDF.groupByKey(_.getString(0)).flatMapGroups { case (key, iter) => val ys = iter.toSeq ; if (ys.size >1) Seq(ys.map(_.mkString(", "))) else Seq.empty }.take(10)

Aside: this idiom reflects a monadic principle.

The filter method is completely described in one simple law:
FIL1. m filter p ≡ m flatMap {x => if(p(x)) unit(x) else mzero}

(from here).

Joins

If we want to optimize a join, we might want to re-partion the DataFrame with something like this so that all joins will take place in one partition.

aDataFrame.repartition(FIELD_NAME).write.format("parquet").mode("overwrite").save(FILE_NAME)

Now, if we want to do a join, it looks a little like this:

aDataFrame.join(other, other("record_id") <=> $"record".getItem("record_id"))

where

other.printSchema
root
 |-- document_type: string (nullable = true)
 |-- record_id: string (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- entitySize: integer (nullable = true)

and

aDataFrame.printSchema
root
 |-- record: struct (nullable = true)
 |    |-- document_type: string (nullable = true)
 |    |-- record_id: string (nullable = true)
.
.


No comments:

Post a Comment