Typed Encoders in Frameless

Spark uses Reflection to derive it's Encoders, which is why they can fail at run time. For example, because Spark does not supports java.util.Date, the following leads to an error:

import org.apache.spark.sql.Dataset
import spark.implicits._

case class DateRange(s: java.util.Date, e: java.util.Date)
scala> val ds: Dataset[DateRange] = sqlContext.createDataset(Seq(DateRange(new java.util.Date, new java.util.Date)))
java.lang.UnsupportedOperationException: No Encoder found for java.util.Date
- field (class: "java.util.Date", name: "s")
- root class: "DateRange"
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:355)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:583)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)
  ... 230 elided

As shown by the stack trace, this runtime error goes thought ScalaReflection to try to derive an Encoder for Dataset schema. Beside the annoyance of not detecting this error at compile time, a more important limitation of the reflection based approach is it's inability to be extended for custom types. See this Stack Overflow question for a summary of the current situation (as of 2.0) in vanilla Spark: How to store custom objects in a Dataset?.

Frameless introduces a new type class called TypeEncoder to solve these issues. TypeEncoders are passed around as implicit parameters to every frameless method to ensure that the data being manipulated is Encoder. It uses a standard implicit resolution coupled with shapeless type class derivation mechanism to ensure every that compiling code manipulates encodable data. For example, the code java.util.Date example won't compile with frameless:

import frameless.TypedDataset
val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(new java.util.Date, new java.util.Date)))
// <console>:26: error: could not find implicit value for parameter encoder: frameless.TypedEncoder[DateRange]
//        val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(new java.util.Date, new java.util.Date)))
//                                                             ^

Type class derivation takes case or recursively constructing (and proving the existence) TypeEncoders for case classes. The following works as expected:

case class Bar(d: Double, s: String)
// defined class Bar

case class Foo(i: Int, b: Bar)
// defined class Foo

val ds: TypedDataset[Foo] = TypedDataset.create(Seq(Foo(1, Bar(1.1, "s"))))
// ds: frameless.TypedDataset[Foo] = [i: int, b: struct<d: double, s: string>]

ds.collect()
// res2: frameless.Job[Seq[Foo]] = frameless.Job$$anon$4@8a0bd3c

But any non-encodable in the case class hierarchy will be detected at compile time:

case class BarDate(d: Double, s: String, t: java.util.Date)
case class FooDate(i: Int, b: BarDate)
val ds: TypedDataset[FooDate] = TypedDataset.create(Seq(FooDate(1, BarDate(1.1, "s", new java.util.Date))))
// <console>:28: error: could not find implicit value for parameter encoder: frameless.TypedEncoder[FooDate]
//        val ds: TypedDataset[FooDate] = TypedDataset.create(Seq(FooDate(1, BarDate(1.1, "s", new java.util.Date))))
//                                                           ^

It should be noted that once derived, reflection based Encoders and implicitly derived TypeEncoders have identical performances. The derivation mechanism is different, but the objects generated to encode and decode JVM object in the Spark internal representation behave the same at run-time.

results matching ""

    No results matching ""