diff --git a/emma-common/src/main/scala/eu/stratosphere/emma/api/DataBag.scala b/emma-common/src/main/scala/eu/stratosphere/emma/api/DataBag.scala index 4766e2301..8043149c0 100644 --- a/emma-common/src/main/scala/eu/stratosphere/emma/api/DataBag.scala +++ b/emma-common/src/main/scala/eu/stratosphere/emma/api/DataBag.scala @@ -1,9 +1,5 @@ package eu.stratosphere.emma.api -import java.io.IOException - -import eu.stratosphere.emma.runtime.logger - import scala.language.experimental.macros /** @@ -157,47 +153,17 @@ sealed abstract class DataBag[+A] extends Serializable { /** * A DataBag backed by a parallel representation that can be evaluated in order to materialize the underlying values. * - * Note: We want to avoid calling eval, unless we need to. We need to call it if the user - * 1. calls fetch() - * 2. uses the DataBag outside of the parallelize block - * 3. uses the DataBag in a higher-order context - * In the 3. case, the ParallelizedDataBag gets serialized and sent to the cluster. This wouldn't work - * if we just sent the repr, because RDD or DataSet operations (like .collect()) can't be called from UDFs. - * So, in this case, we detect that we are being serialized and call eval before it (and not ship the repr). - * * @param name The name identifying the backing parallel representation. * @param repr The parallel representation for this bag. - * @param eval The method that evaluates the parallel representation and fetches the values as a Seq[A]. + * @param vals The method that evaluates the parallel representation and fetches the values as a Seq[A]. * @tparam A The element type of this bag. * @tparam R The type of the parallel representation. */ +// FIXME: can we compute `vals` lazily without breaking serialization? sealed class ParallelizedDataBag[A, R] private[api]( - val name: String, - @transient val repr: R, - eval: => Seq[A]) extends DataBag[A] with java.io.Serializable { - - var valsCollected: Seq[A] = _ - - private def collectVals() = { - if(valsCollected == null) { - val startTime = System.nanoTime - valsCollected = eval - val time = Math.round((System.nanoTime - startTime) / 1e6) - logger.info(s"Fetching ParallelizedDataBag '$name' to the driver took $time ms") - } - } - - private[emma] def vals: Seq[A] = { - collectVals() - valsCollected - } - - @throws(classOf[IOException]) - private def writeObject(out: java.io.ObjectOutputStream): Unit = { - collectVals() - out.defaultWriteObject() - } -} + @transient val name: String, + @transient val repr: R, + private[emma] val vals: Seq[A]) extends DataBag[A] object DataBag {