TypedDataset: Feature Overview

This tutorial introduces TypedDataset using a simple example. The following imports are needed to make all code examples compile.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import frameless.functions.aggregate._
import frameless.TypedDataset

val conf = new SparkConf().setMaster("local[*]").setAppName("frameless repl").set("spark.ui.enabled", "false")
val spark = SparkSession.builder().config(conf).appName("REPL").getOrCreate()
implicit val sqlContext = spark.sqlContext
spark.sparkContext.setLogLevel("WARN")

import spark.implicits._

Creating TypedDataset instances

We start by defining a case class:

case class Apartment(city: String, surface: Int, price: Double)

And few Apartment instances:

val apartments = Seq(
  Apartment("Paris", 50, 300000.0),
  Apartment("Paris", 100, 450000.0),
  Apartment("Paris", 25, 250000.0),
  Apartment("Lyon", 83, 200000.0),
  Apartment("Lyon", 45, 133000.0),
  Apartment("Nice", 74, 325000.0)
)

We are now ready to instantiate a TypedDataset[Apartment]:

val aptTypedDs = TypedDataset.create(apartments)
// aptTypedDs: frameless.TypedDataset[Apartment] = [city: string, surface: int ... 1 more field]

We can also create one from an existing Spark Dataset:

val aptDs = spark.createDataset(apartments)
// aptDs: org.apache.spark.sql.Dataset[Apartment] = [city: string, surface: int ... 1 more field]

val aptTypedDs = TypedDataset.create(aptDs)
// aptTypedDs: frameless.TypedDataset[Apartment] = [city: string, surface: int ... 1 more field]

Or use the Frameless syntax:

import frameless.syntax._
// import frameless.syntax._

val aptTypedDs2 = aptDs.typed
// aptTypedDs2: frameless.TypedDataset[Apartment] = [city: string, surface: int ... 1 more field]

Typesafe column referencing

This is how we select a particular column from a TypedDataset:

val cities: TypedDataset[String] = aptTypedDs.select(aptTypedDs('city))
// cities: frameless.TypedDataset[String] = [_1: string]

This is completely type-safe, for instance suppose we misspell city as citi:

aptTypedDs.select(aptTypedDs('citi))
// <console>:28: error: No column Symbol with shapeless.tag.Tagged[String("citi")] of type A in Apartment
//        aptTypedDs.select(aptTypedDs('citi))
//                                    ^

This gets raised at compile-time, whereas with the standard Dataset API the error appears at run-time (enjoy the stack trace):

aptDs.select('citi)
// org.apache.spark.sql.AnalysisException: cannot resolve '`citi`' given input columns: [city, surface, price];;
// 'Project ['citi]
// +- LocalRelation [city#206, surface#207, price#208]
// 
//   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
//   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
//   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
//   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
//   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
//   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
//   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
//   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)
//   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)
//   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)
//   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
//   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
//   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
//   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
//   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
//   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
//   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)
//   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)
//   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
//   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288)
//   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)
//   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
//   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
//   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
//   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
//   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
//   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
//   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)
//   at org.apache.spark.sql.Dataset.select(Dataset.scala:969)
//   ... 458 elided

select() supports arbitrary column operations:

aptTypedDs.select(aptTypedDs('surface) * 10, aptTypedDs('surface) + 2).show().run()
// +----+---+
// |  _1| _2|
// +----+---+
// | 500| 52|
// |1000|102|
// | 250| 27|
// | 830| 85|
// | 450| 47|
// | 740| 76|
// +----+---+
//

Note that unlike the standard Spark API where some operations are lazy and some are not, TypedDatasets have all operations to be lazy. In the above example, show() is lazy. It requires to apply run() for the show job to materialize. A more detailed explanation of Job is given here.

Next we compute the price by surface unit:

val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
// <console>:27: error: overloaded method value / with alternatives:
//   (u: Double)(implicit n: frameless.CatalystNumeric[Double])frameless.TypedColumn[Apartment,Double] <and>
//   (u: frameless.TypedColumn[Apartment,Double])(implicit n: frameless.CatalystNumeric[Double])frameless.TypedColumn[Apartment,Double]
//  cannot be applied to (frameless.TypedColumn[Apartment,Int])
//        val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
//                                                                      ^

As the error suggests, we can't divide a TypedColumn of Double by Int. For safety, in Frameless only math operations between same types is allowed. There are two ways to proceed here:

(a) Explicitly cast Int to Double (manual)

val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface).cast[Double])
// priceBySurfaceUnit: frameless.TypedDataset[Double] = [_1: double]

priceBySurfaceUnit.collect().run()
// res6: Seq[Double] = WrappedArray(6000.0, 4500.0, 10000.0, 2409.6385542168673, 2955.5555555555557, 4391.891891891892)

(b) Perform the cast implicitly (automated)

import frameless.implicits.widen._
// import frameless.implicits.widen._

val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
// priceBySurfaceUnit: frameless.TypedDataset[Double] = [_1: double]

priceBySurfaceUnit.collect.run()
// res7: Seq[Double] = WrappedArray(6000.0, 4500.0, 10000.0, 2409.6385542168673, 2955.5555555555557, 4391.891891891892)

Looks like it worked, but that cast seems unsafe right? Actually it is safe. Let's try to cast a TypedColumn of String to Double:

aptTypedDs('city).cast[Double]
// <console>:31: error: could not find implicit value for parameter c: frameless.CatalystCast[String,Double]
//        aptTypedDs('city).cast[Double]
//                              ^

The compile-time error tells us that to perform the cast, an evidence (in the form of CatalystCast[String, Double]) must be available. Since casting from String to Double is not allowed, this results in a compilation error.

Check here for the set of available CatalystCast.

TypeSafe TypedDataset casting and projections

With select() the resulting TypedDataset is of type TypedDataset[TupleN[...]] (with N in [1...10]). For example, if we select three columns with types String, Int, and Boolean the result will have type TypedDataset[(String, Int, Boolean)]. To select more than ten columns use the selectMany() method. Select has better IDE support than the macro based selectMany, so prefer select() for the general case.

We often want to give more expressive types to the result of our computations. as[T] allows us to safely cast a TypedDataset[U] to another of type TypedDataset[T] as long as the types in U and T align.

When the cast is valid the expression compiles:

case class UpdatedSurface(city: String, surface: Int)
// defined class UpdatedSurface

val updated = aptTypedDs.select(aptTypedDs('city), aptTypedDs('surface) + 2).as[UpdatedSurface]
// updated: frameless.TypedDataset[UpdatedSurface] = [city: string, surface: int]

updated.show(2).run()
// +-----+-------+
// | city|surface|
// +-----+-------+
// |Paris|     52|
// |Paris|    102|
// +-----+-------+
// only showing top 2 rows
//

Next we try to cast a (String, String) to an UpdatedSurface (which has types String, Int). The cast is not valid and the expression does not compile:

aptTypedDs.select(aptTypedDs('city), aptTypedDs('city)).as[UpdatedSurface]
// <console>:33: error: could not find implicit value for parameter as: frameless.ops.As[(String, String),UpdatedSurface]
//        aptTypedDs.select(aptTypedDs('city), aptTypedDs('city)).as[UpdatedSurface]
//                                                                  ^

Projections

We often want to work with a subset of the fields in a dataset. Projections allows to easily select the fields we are interested while preserving their initial name and types for extra safety.

Here is an example using the TypedDataset[Apartment] with an additional column:

import frameless.implicits.widen._
// import frameless.implicits.widen._

val aptds = aptTypedDs // For shorter expressions
// aptds: frameless.TypedDataset[Apartment] = [city: string, surface: int ... 1 more field]

case class ApartmentDetails(city: String, price: Double, surface: Int, ratio: Double)
// defined class ApartmentDetails

val aptWithRatio = aptds.select(aptds('city), aptds('price), aptds('surface), aptds('price) / aptds('surface)).as[ApartmentDetails]
// aptWithRatio: frameless.TypedDataset[ApartmentDetails] = [city: string, price: double ... 2 more fields]

Suppose we only want to work with city and ratio:

case class CityInfo(city: String, ratio: Double)
// defined class CityInfo

val cityRatio = aptWithRatio.project[CityInfo]
// cityRatio: frameless.TypedDataset[CityInfo] = [city: string, ratio: double]

cityRatio.show(2).run()
// +-----+------+
// | city| ratio|
// +-----+------+
// |Paris|6000.0|
// |Paris|4500.0|
// +-----+------+
// only showing top 2 rows
//

Suppose we only want to work with price and ratio:

case class PriceInfo(ratio: Double, price: Double)
// defined class PriceInfo

val priceInfo = aptWithRatio.project[PriceInfo]
// priceInfo: frameless.TypedDataset[PriceInfo] = [ratio: double, price: double]

priceInfo.show(2).run()
// +------+--------+
// | ratio|   price|
// +------+--------+
// |6000.0|300000.0|
// |4500.0|450000.0|
// +------+--------+
// only showing top 2 rows
//

We see that the order of the fields does not matter as long as the names and the corresponding types agree. However, if we make a mistake in any of the names and/or their types, then we get a compilation error.

Say we make a typo in a field name:

case class PriceInfo2(ratio: Double, pricEE: Double)
aptWithRatio.project[PriceInfo2]
// <console>:36: error: Cannot prove that ApartmentDetails can be projected to PriceInfo2. Perhaps not all member names and types of PriceInfo2 are the same in ApartmentDetails?
//        aptWithRatio.project[PriceInfo2]
//                            ^

Say we make a mistake in the corresponding type:

case class PriceInfo3(ratio: Int, price: Double) // ratio should be Double
aptWithRatio.project[PriceInfo3]
// <console>:36: error: Cannot prove that ApartmentDetails can be projected to PriceInfo3. Perhaps not all member names and types of PriceInfo3 are the same in ApartmentDetails?
//        aptWithRatio.project[PriceInfo3]
//                            ^

User Defined Functions

Frameless supports lifting any Scala function (up to five arguments) to the context of a particular TypedDataset:

// The function we want to use as UDF
val priceModifier =
    (name: String, price:Double) => if(name == "Paris") price * 2.0 else price
// priceModifier: (String, Double) => Double = <function2>

val udf = aptTypedDs.makeUDF(priceModifier)
// udf: (frameless.TypedColumn[Apartment,String], frameless.TypedColumn[Apartment,Double]) => frameless.TypedColumn[Apartment,Double] = <function2>

val aptds = aptTypedDs // For shorter expressions
// aptds: frameless.TypedDataset[Apartment] = [city: string, surface: int ... 1 more field]

val adjustedPrice = aptds.select(aptds('city), udf(aptds('city), aptds('price)))
// adjustedPrice: frameless.TypedDataset[(String, Double)] = [_1: string, _2: double]

adjustedPrice.show().run()
// +-----+--------+
// |   _1|      _2|
// +-----+--------+
// |Paris|600000.0|
// |Paris|900000.0|
// |Paris|500000.0|
// | Lyon|200000.0|
// | Lyon|133000.0|
// | Nice|325000.0|
// +-----+--------+
//

GroupBy and Aggregations

Let's suppose we wanted to retrieve the average apartment price in each city

val priceByCity = aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('price)))
// priceByCity: frameless.TypedDataset[(String, Double)] = [_1: string, _2: double]

priceByCity.collect().run()
// res17: Seq[(String, Double)] = WrappedArray((Nice,325000.0), (Paris,333333.3333333333), (Lyon,166500.0))

Again if we try to aggregate a column that can't be aggregated, we get a compilation error

aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('city)))                                                         ^
// <console>:34: error: could not find implicit value for parameter averageable: frameless.CatalystAverageable[String,Out]
//        aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('city)))                                                         ^
//                                                     ^
// <console>:34: warning: postfix operator ^ should be enabled
// by making the implicit value scala.language.postfixOps visible.
// This can be achieved by adding the import clause 'import scala.language.postfixOps'
// or by setting the compiler option -language:postfixOps.
// See the Scaladoc for value scala.language.postfixOps for a discussion
// why the feature should be explicitly enabled.
//        aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('city)))                                                         ^
//                                                                                                                                  ^

Next, we combine select and groupBy to calculate the average price/surface ratio per city:

val aptds = aptTypedDs // For shorter expressions
// aptds: frameless.TypedDataset[Apartment] = [city: string, surface: int ... 1 more field]

val cityPriceRatio =  aptds.select(aptds('city), aptds('price) / aptds('surface))
// cityPriceRatio: frameless.TypedDataset[(String, Double)] = [_1: string, _2: double]

cityPriceRatio.groupBy(cityPriceRatio('_1)).agg(avg(cityPriceRatio('_2))).show().run()
// +-----+------------------+
// |   _1|                _2|
// +-----+------------------+
// | Nice| 4391.891891891892|
// |Paris| 6833.333333333333|
// | Lyon|2682.5970548862115|
// +-----+------------------+
//

Entire TypedDataset Aggregation

We often want to aggregate the entire TypedDataset and skip the groupBy() clause. In Frameless you can do this using the agg() operator directly on the TypedDataset. In the following example, we compute the average price, the average surface,
the minimum surface, and the set of cities for the entire dataset.

case class Stats(
   avgPrice: Double, 
   avgSurface: Double, 
   minSurface: Int, 
   allCities: Vector[String])
// defined class Stats

aptds.agg(
   avg(aptds('price)), 
   avg(aptds('surface)),
   min(aptds('surface)),
   collectSet(aptds('city))
).as[Stats].show().run() 
// +-----------------+------------------+----------+-------------------+
// |         avgPrice|        avgSurface|minSurface|          allCities|
// +-----------------+------------------+----------+-------------------+
// |276333.3333333333|62.833333333333336|        25|[Paris, Nice, Lyon]|
// +-----------------+------------------+----------+-------------------+
//

Joins

case class CityPopulationInfo(name: String, population: Int)

val cityInfo = Seq(
  CityPopulationInfo("Paris", 2229621),
  CityPopulationInfo("Lyon", 500715),
  CityPopulationInfo("Nice", 343629)
)

val citiInfoTypedDS = TypedDataset.create(cityInfo)

Here is how to join the population information to the apartment's dataset.

val withCityInfo = aptTypedDs.join(citiInfoTypedDS, aptTypedDs('city), citiInfoTypedDS('name))
// withCityInfo: frameless.TypedDataset[(Apartment, CityPopulationInfo)] = [_1: struct<city: string, surface: int ... 1 more field>, _2: struct<name: string, population: int>]

withCityInfo.show().run()
// +--------------------+---------------+
// |                  _1|             _2|
// +--------------------+---------------+
// | [Paris,50,300000.0]|[Paris,2229621]|
// |[Paris,100,450000.0]|[Paris,2229621]|
// | [Paris,25,250000.0]|[Paris,2229621]|
// |  [Lyon,83,200000.0]|  [Lyon,500715]|
// |  [Lyon,45,133000.0]|  [Lyon,500715]|
// |  [Nice,74,325000.0]|  [Nice,343629]|
// +--------------------+---------------+
//

The joined TypedDataset has type TypedDataset[(Apartment, CityPopulationInfo)].

We can then select which information we want to continue to work with:

case class AptPriceCity(city: String, aptPrice: Double, cityPopulation: Int)
// defined class AptPriceCity

withCityInfo.select(
   withCityInfo.colMany('_2, 'name), withCityInfo.colMany('_1, 'price), withCityInfo.colMany('_2, 'population)
).as[AptPriceCity].show().run
// +-----+--------+--------------+
// | city|aptPrice|cityPopulation|
// +-----+--------+--------------+
// |Paris|300000.0|       2229621|
// |Paris|450000.0|       2229621|
// |Paris|250000.0|       2229621|
// | Lyon|200000.0|        500715|
// | Lyon|133000.0|        500715|
// | Nice|325000.0|        343629|
// +-----+--------+--------------+
//

results matching ""

    No results matching ""