Job[A]

All operations on TypedDataset are lazy. An operation either returns a new transformed TypedDataset or a Job[A], where A is the result of running a non-lazy computation in Spark. Job serves several functions:

  • Makes all operations on a TypedDataset lazy, which makes them more predictable compared to having few operations being lazy and other being strict
  • Allows the programmer to make expensive blocking operations explicit
  • Allows for Spark jobs to be lazily sequenced using monadic composition via for-comprehension
  • Provides an obvious place where you can annotate/name your Spark jobs to make it easier to track different parts of your application in the Spark UI

The toy example showcases the use of for-comprehension to explicitly sequences Spark Jobs. First we calculate the size of the TypedDataset and then we collect to the driver exactly 20% of its elements:

val ds = TypedDataset.create(1 to 20)
// ds: frameless.TypedDataset[Int] = [_1: int]

val countAndTakeJob =
  for {
    count <- ds.count()
    sample <- ds.take((count/5).toInt)
  } yield sample
// countAndTakeJob: frameless.Job[Seq[Int]] = frameless.Job$$anon$3@1052ec89

countAndTakeJob.run()
// res1: Seq[Int] = WrappedArray(1, 2, 3, 4)

The countAndTakeJob can either be executed using run() (as we show above) or it can be passed along to other parts of the program to be further composed into more complex sequences of Spark jobs.

import frameless.Job
// import frameless.Job

def computeMinOfSample(sample: Job[Seq[Int]]): Job[Int] = sample.map(_.min)
// computeMinOfSample: (sample: frameless.Job[Seq[Int]])frameless.Job[Int]

val finalJob = computeMinOfSample(countAndTakeJob)
// finalJob: frameless.Job[Int] = frameless.Job$$anon$2@2e359e76

Now we can execute this new job by specifying a group-id and a description. This allows the programmer to see this information on the Spark UI and help track, say, performance issues.

finalJob.
  withGroupId("samplingJob").
  withDescription("Samples 20% of elements and computes the min").
  run()
// res2: Int = 1

results matching ""

    No results matching ""