Skip to content

Commit

Permalink
[RUNTIME] Make the vals in ParallelizedDataBag lazily collected to th…
Browse files Browse the repository at this point in the history
…e driver.
  • Loading branch information
ggevay committed Aug 25, 2016
1 parent 2b25f2a commit cedbe42
Showing 1 changed file with 39 additions and 5 deletions.
44 changes: 39 additions & 5 deletions emma-common/src/main/scala/eu/stratosphere/emma/api/DataBag.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package eu.stratosphere.emma.api

import java.io.IOException

import eu.stratosphere.emma.runtime.logger

import scala.language.experimental.macros

/**
Expand Down Expand Up @@ -153,17 +157,47 @@ sealed abstract class DataBag[+A] extends Serializable {
/**
* A DataBag backed by a parallel representation that can be evaluated in order to materialize the underlying values.
*
* Note: We want to avoid calling eval, unless we need to. We need to call it if the user
* 1. calls fetch()
* 2. uses the DataBag outside of the parallelize block
* 3. uses the DataBag in a higher-order context
* In the 3. case, the ParallelizedDataBag gets serialized and sent to the cluster. This wouldn't work
* if we just sent the repr, because RDD or DataSet operations (like .collect()) can't be called from UDFs.
* So, in this case, we detect that we are being serialized and call eval before it (and not ship the repr).
*
* @param name The name identifying the backing parallel representation.
* @param repr The parallel representation for this bag.
* @param vals The method that evaluates the parallel representation and fetches the values as a Seq[A].
* @param eval The method that evaluates the parallel representation and fetches the values as a Seq[A].
* @tparam A The element type of this bag.
* @tparam R The type of the parallel representation.
*/
// FIXME: can we compute `vals` lazily without breaking serialization?
sealed class ParallelizedDataBag[A, R] private[api](
@transient val name: String,
@transient val repr: R,
private[emma] val vals: Seq[A]) extends DataBag[A]
val name: String,
@transient val repr: R,
eval: => Seq[A]) extends DataBag[A] with java.io.Serializable {

var valsCollected: Seq[A] = _

private def collectVals() = {
if(valsCollected == null) {
val startTime = System.nanoTime
valsCollected = eval
val time = Math.round((System.nanoTime - startTime) / 1e6)
logger.info(s"Fetching ParallelizedDataBag '$name' to the driver took $time ms")
}
}

private[emma] def vals: Seq[A] = {
collectVals()
valsCollected
}

@throws(classOf[IOException])
private def writeObject(out: java.io.ObjectOutputStream): Unit = {
collectVals()
out.defaultWriteObject()
}
}

object DataBag {

Expand Down

0 comments on commit cedbe42

Please sign in to comment.