Using Cats with RDDs
Data aggregation is one of the most important operations when working with Spark (and data in general).
For example, we often have to compute the min, max, avg, etc. from a set of columns grouped by
different predicates. This section shows how cats simplifies these tasks in Spark by
leveraging a large collection of Type Classes for ordering and aggregating data.
All the examples below assume you have previously imported cats.implicits.
import cats.implicits._
// import cats.implicits._
Cats offers ways to sort and aggregate tuples of arbitrary arity.
import frameless.cats.implicits._
// import frameless.cats.implicits._
val data: RDD[(Int, Int, Int)] = sc.makeRDD((1, 2, 3) :: (1, 5, 3) :: (8, 2, 3) :: Nil)
// data: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:21
println(data.csum)
// (10,9,9)
println(data.cmax)
// (8,2,3)
println(data.cmin)
// (1,2,3)
The following example aggregates all the elements with a common key.
type User = String
// defined type alias User
type TransactionCount = Int
// defined type alias TransactionCount
val allData: RDD[(User,TransactionCount)] =
sc.makeRDD(("Bob", 12) :: ("Joe", 1) :: ("Anna", 100) :: ("Bob", 20) :: ("Joe", 2) :: Nil)
// allData: org.apache.spark.rdd.RDD[(User, TransactionCount)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
val totalPerUser = allData.csumByKey
// totalPerUser: org.apache.spark.rdd.RDD[(User, TransactionCount)] = ShuffledRDD[2] at reduceByKey at implicits.scala:18
totalPerUser.collectAsMap
// res7: scala.collection.Map[User,TransactionCount] = Map(Bob -> 32, Joe -> 3, Anna -> 100)
The same example would work for more complex keys.
val allDataComplexKeu =
sc.makeRDD( ("Bob", Map("task1" -> 10)) ::
("Joe", Map("task1" -> 1, "task2" -> 3)) :: ("Bob", Map("task1" -> 10, "task2" -> 1)) :: ("Joe", Map("task3" -> 4)) :: Nil )
// allDataComplexKeu: org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Map[String,Int])] = ParallelCollectionRDD[3] at makeRDD at <console>:22
val overalTasksPerUser = allDataComplexKeu.csumByKey
// overalTasksPerUser: org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Map[String,Int])] = ShuffledRDD[4] at reduceByKey at implicits.scala:18
overalTasksPerUser.collectAsMap
// res8: scala.collection.Map[String,scala.collection.immutable.Map[String,Int]] = Map(Bob -> Map(task1 -> 20, task2 -> 1), Joe -> Map(task1 -> 1, task2 -> 3, task3 -> 4))
Joins
// Type aliases for meaningful types
type TimeSeries = Map[Int,Int]
// defined type alias TimeSeries
type UserName = String
// defined type alias UserName
Example: Using the implicit full-our-join operator
import frameless.cats.outer._
// import frameless.cats.outer._
val day1: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 2, 1 -> 4)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Sam", Map(0 -> 1)) :: Nil )
// day1: org.apache.spark.rdd.RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[5] at makeRDD at <console>:26
val day2: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 10, 1 -> 11)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Joe", Map(0 -> 1, 1 -> 2)) :: Nil )
// day2: org.apache.spark.rdd.RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[6] at makeRDD at <console>:26
val daysCombined = day1 |+| day2
// daysCombined: org.apache.spark.rdd.RDD[(UserName, TimeSeries)] = MapPartitionsRDD[10] at mapValues at implicits.scala:43
daysCombined.collect()
// res10: Array[(UserName, TimeSeries)] = Array((Joe,Map(0 -> 1, 1 -> 2)), (Sam,Map(0 -> 1)), (Chris,Map(0 -> 2, 1 -> 4)), (John,Map(0 -> 12, 1 -> 15)))
Note how the user's timeseries from different days have been aggregated together.
The |+| (Semigroup) operator for key-value pair RDD will execute a full-outer-join
on the key and combine values using the default Semigroup for the value type.
In cats:
Map(1 -> 2, 2 -> 3) |+| Map(1 -> 4, 2 -> -1)
// res11: Map[Int,Int] = Map(1 -> 6, 2 -> 2)