Using Cats with RDDs

Data aggregation is one of the most important operations when working with Spark (and data in general). For example, we often have to compute the min, max, avg, etc. from a set of columns grouped by different predicates. This section shows how cats simplifies these tasks in Spark by leveraging a large collection of Type Classes for ordering and aggregating data.

All the examples below assume you have previously imported cats.implicits.

import cats.implicits._
// import cats.implicits._

Cats offers ways to sort and aggregate tuples of arbitrary arity.

import frameless.cats.implicits._
// import frameless.cats.implicits._

val data: RDD[(Int, Int, Int)] = sc.makeRDD((1, 2, 3) :: (1, 5, 3) :: (8, 2, 3) :: Nil)
// data: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:21

println(data.csum)
// (10,9,9)

println(data.cmax)
// (8,2,3)

println(data.cmin)
// (1,2,3)

The following example aggregates all the elements with a common key.

type User = String
// defined type alias User

type TransactionCount = Int
// defined type alias TransactionCount

val allData: RDD[(User,TransactionCount)] =
   sc.makeRDD(("Bob", 12) :: ("Joe", 1) :: ("Anna", 100) :: ("Bob", 20) :: ("Joe", 2) :: Nil)
// allData: org.apache.spark.rdd.RDD[(User, TransactionCount)] = ParallelCollectionRDD[1] at makeRDD at <console>:24

val totalPerUser =  allData.csumByKey
// totalPerUser: org.apache.spark.rdd.RDD[(User, TransactionCount)] = ShuffledRDD[2] at reduceByKey at implicits.scala:18

totalPerUser.collectAsMap
// res7: scala.collection.Map[User,TransactionCount] = Map(Bob -> 32, Joe -> 3, Anna -> 100)

The same example would work for more complex keys.

val allDataComplexKeu =
   sc.makeRDD( ("Bob", Map("task1" -> 10)) ::
    ("Joe", Map("task1" -> 1, "task2" -> 3)) :: ("Bob", Map("task1" -> 10, "task2" -> 1)) :: ("Joe", Map("task3" -> 4)) :: Nil )
// allDataComplexKeu: org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Map[String,Int])] = ParallelCollectionRDD[3] at makeRDD at <console>:22

val overalTasksPerUser = allDataComplexKeu.csumByKey
// overalTasksPerUser: org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Map[String,Int])] = ShuffledRDD[4] at reduceByKey at implicits.scala:18

overalTasksPerUser.collectAsMap
// res8: scala.collection.Map[String,scala.collection.immutable.Map[String,Int]] = Map(Bob -> Map(task1 -> 20, task2 -> 1), Joe -> Map(task1 -> 1, task2 -> 3, task3 -> 4))

Joins

// Type aliases for meaningful types
type TimeSeries = Map[Int,Int]
// defined type alias TimeSeries

type UserName = String
// defined type alias UserName

Example: Using the implicit full-our-join operator

import frameless.cats.outer._
// import frameless.cats.outer._

val day1: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 2, 1 -> 4)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Sam", Map(0 -> 1)) :: Nil )
// day1: org.apache.spark.rdd.RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[5] at makeRDD at <console>:26

val day2: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 10, 1 -> 11)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Joe", Map(0 -> 1, 1 -> 2)) :: Nil )
// day2: org.apache.spark.rdd.RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[6] at makeRDD at <console>:26

val daysCombined = day1 |+| day2
// daysCombined: org.apache.spark.rdd.RDD[(UserName, TimeSeries)] = MapPartitionsRDD[10] at mapValues at implicits.scala:43

daysCombined.collect()
// res10: Array[(UserName, TimeSeries)] = Array((Joe,Map(0 -> 1, 1 -> 2)), (Sam,Map(0 -> 1)), (Chris,Map(0 -> 2, 1 -> 4)), (John,Map(0 -> 12, 1 -> 15)))

Note how the user's timeseries from different days have been aggregated together. The |+| (Semigroup) operator for key-value pair RDD will execute a full-outer-join on the key and combine values using the default Semigroup for the value type.

In cats:

Map(1 -> 2, 2 -> 3) |+| Map(1 -> 4, 2 -> -1)
// res11: Map[Int,Int] = Map(1 -> 6, 2 -> 2)

results matching ""

    No results matching ""