diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala index 59e475e9..7ac0f5dc 100644 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala @@ -25,96 +25,96 @@ import com.github.javacliparser._ import org.apache.spark.streamdm.core.specification.ExampleSpecification /** - * Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a - * small (weighted) sample of the stream by using coresets, and then uses - * it as an input to a k-means++ algorithm. It uses a data structure called - * BucketManager to handle the coresets. - * - *

It uses the following options: - *

- */ + * Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a + * small (weighted) sample of the stream by using coresets, and then uses + * it as an input to a k-means++ algorithm. It uses a data structure called + * BucketManager to handle the coresets. + * + *

It uses the following options: + *

+ */ class StreamKM extends Clusterer { - - type T = BucketManager + type T = BucketManager var bucketmanager: BucketManager = null var numInstances: Long = 0 - var initialBuffer: Array[Example] = Array[Example]() - + var clusters: Array[Example] = null + val kOption: IntOption = new IntOption("numClusters", 'k', "Number of clusters for output", 10, 1, Integer.MAX_VALUE) - + val repOption: IntOption = new IntOption("kMeansIters", 'i', "Number of k-means iterations", 1000, 1, Integer.MAX_VALUE) val sizeCoresetOption: IntOption = new IntOption("sizeCoreset", 's', "Size of coreset", 10000, 1, Integer.MAX_VALUE) - + val widthOption: IntOption = new IntOption("width", - 'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE); - + 'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE); + var exampleLearnerSpecification: ExampleSpecification = null - - /** - * Init the StreamKM++ algorithm. - */ + + /** + * Init the StreamKM++ algorithm. + */ def init(exampleSpecification: ExampleSpecification) : Unit = { exampleLearnerSpecification = exampleSpecification bucketmanager = new BucketManager(widthOption.getValue, sizeCoresetOption.getValue) } - - /** - * Maintain the BucketManager for coreset extraction, given an input DStream of Example. - * @param input a stream of instances - */ - def train(input: DStream[Example]): Unit = { - input.foreachRDD(rdd => { - rdd.foreach(ex => { - bucketmanager = bucketmanager.update(ex) - numInstances += 1 - }) - }) - } - + + + def train(input: DStream[Example]): Unit = {} + /** - * Gets the current Model used for the Learner. - * @return the Model object used for training - */ + * Gets the current Model used for the Learner. + * @return the Model object used for training + */ def getModel: BucketManager = bucketmanager - - /** - * Get the currently computed clusters - * @return an Array of Examples representing the clusters - */ + + /** + * Get the currently computed clusters + * @return an Array of Examples representing the clusters + */ def getClusters: Array[Example] = { if(numInstances <= sizeCoresetOption.getValue) { bucketmanager.buckets(0).points.toArray - } + } else { - val streamingCoreset = bucketmanager.getCoreset - KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue) + val streamingCoreset = bucketmanager.getCoreset + KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue) } } - + /** - * Assigns examples to clusters, given the current Clusters data structure. - * @param input the DStream of Examples to be assigned a cluster - * @return a DStream of tuples containing the original Example and the - * assigned cluster. - */ + * Maintain the BucketManager for coreset extraction, given an input DStream of Example. + * @param input a stream of instances + * @return a DStream of tuples containing the original Example and the + * assigned cluster. + */ def assign(input: DStream[Example]): DStream[(Example,Double)] = { - input.map(x => { - val assignedCl = getClusters.foldLeft((0,Double.MaxValue,0))( - (cl,centr) => { - val dist = centr.in.distanceTo(x.in) - if(dist { + numInstances += 1 + bucketmanager = bucketmanager.update(ex) + if(numInstances <= sizeCoresetOption.getValue){ + clusters = KMeans.cluster(bucketmanager.buckets(0).points.toArray,kOption.getValue,repOption.getValue) + } + else + { + val streamingCoreset = bucketmanager.getCoreset + clusters = KMeans.cluster(streamingCoreset,kOption.getValue,repOption.getValue) + } + + val assignedCl = clusters.foldLeft((0, Double.MaxValue, 0))( + (cl, centr) => { + val dist = centr.in.distanceTo(ex.in) + if (dist < cl._2) ((cl._3, dist, cl._3 + 1)) + else ((cl._1, cl._2, cl._3 + 1)) })._1 - (x,assignedCl) + (ex,assignedCl) }) } } diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala index e9f4f5a9..d3140664 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala @@ -19,7 +19,6 @@ package org.apache.spark.streamdm.clusterers.clusters import org.apache.spark.streamdm.core._ import scala.collection.mutable.Queue -import scala.util.control.Breaks._ /** @@ -31,7 +30,6 @@ import scala.util.control.Breaks._ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { type T = BucketManager - /** * Inner class Bucket for new instance management, this class has two buffers for * recursively computing the coresets. @@ -57,7 +55,7 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { // Check if there is enough space in the first bucket if(buckets(0).isFull){ var curbucket : Int = 0 - var nextbucket : Int =1 + var nextbucket : Int = 1 // Check if the next bucket is empty if(!buckets(nextbucket).isFull) { // Copy curbucket points to nextbucket points @@ -122,25 +120,29 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { * @return the coreset for the examples entered into the buckets. */ def getCoreset: Array[Example] = { - if(buckets(L-1).isFull) { - buckets(L-1).points.toArray - }else { + var isFound: Boolean = false + if (buckets(L - 1).isFull) { + buckets(L - 1).points.toArray + } else { var i = 0 var coreset = Array[Example]() - for(i <- 0 until L) { - if(buckets(i).isFull) { + + for (i <- 0 until L) { + if (buckets(i).isFull && isFound == false) { coreset = buckets(i).points.toArray - break + isFound=true } } - val start = i+1 - for(j <- start until L) { + val start = i + 1 + for (j <- start until L) { val examples = buckets(j).points.toArray ++ coreset val tree = new TreeCoreset coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize), - new Array[Example](0)) + new Array[Example](0)) } coreset } } + + } diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala index c0a3208c..426cf709 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala @@ -82,17 +82,16 @@ class TreeCoreset { * Select a new centre from the leaf node for splitting. * @return the new centre */ - def chooseCentre() : Example = { + def chooseCentre() : Example = { val funcost = this.weightedLeaf().cost val points = elem.points var sum = 0.0 - - for(point <- points) { - sum += costOfPoint(point)/funcost - if(sum >= Random.nextDouble) - return point - } - elem.centre + for (point <- points) { + sum += costOfPoint(point) / funcost + if (sum >= Random.nextDouble) + return point + } + elem.centre } } @@ -119,34 +118,49 @@ class TreeCoreset { * @param leaf coreset tree leaf for spliting * @return a coreset tree node with two leaves */ - private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = { - // Select a example from the points associated with the leaf as a new centre - // for one of the new leaf + private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = { + // Select a example from the points associated with the leaf as a new centre + // for one of the new leaf + val newcentre = leaf.chooseCentre + // The original centre as the other leaf centre val oldcentre = leaf.elem.centre // The points associated with the orignial leaf, the points will be assigned the new leaves val points = leaf.elem.points - // Assign points to leftpoints and rightpoints var leftpoints = new Array[Example](0) var rightpoints = new Array[Example](0) - for(point <- points) { - if(squaredDistance(point, newcentre) < squaredDistance(point, oldcentre)) + for (point <- points) { + if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre)) leftpoints = leftpoints :+ point else rightpoints = rightpoints :+ point } - - // Create new leaves + //prevent assigning all points to one child + //resplit points to leftpoints and rightpoints + if((leftpoints.length == 0 || rightpoints.length==0 ) && points.length>1){ + val newcentre = leaf.chooseCentre + var leftpoints = new Array[Example](0) + var rightpoints = new Array[Example](0) + for (point <- points) { + if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre)) + leftpoints = leftpoints :+ point + else + rightpoints = rightpoints :+ point + } + } + + // Create new leaves val leftElem = new CoresetTreeElem(leftpoints.length, leftpoints, newcentre) val leftleaf = CoresetTreeLeaf(leftElem, 0.0).weightedLeaf - + val rightElem = new CoresetTreeElem(rightpoints.length, rightpoints, oldcentre) val rightleaf = CoresetTreeLeaf(rightElem, 0.0).weightedLeaf - + // Return a coreset tree node with two leaves - new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost+rightleaf.cost) + new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost + rightleaf.cost) + } /** @@ -159,15 +173,39 @@ class TreeCoreset { splitCoresetTreeLeaf(CoresetTreeLeaf(e, c)) } case CoresetTreeNode(e, l, r, c) => { - if (Random.nextDouble > 0.5) { - val lchild = splitCoresetTree(l) - val newcost = lchild.cost + r.cost - CoresetTreeNode(e, lchild, r, newcost) + if(l.cost == 0 && r.cost == 0) { + if (l.elem.n == 0) { + val rchild = splitCoresetTree(r) + val newcost = l.cost + rchild.cost + CoresetTreeNode(e, l, rchild, newcost) + } + if (r.elem.n == 0) { + val lchild = splitCoresetTree(l) + val newcost = lchild.cost + r.cost + CoresetTreeNode(e, lchild, r, newcost) + } + else if (Random.nextDouble > 0.5) { + val lchild = splitCoresetTree(l) + val newcost = lchild.cost + r.cost + CoresetTreeNode(e, lchild, r, newcost) + } + else { + val rchild = splitCoresetTree(r) + val newcost = l.cost + rchild.cost + CoresetTreeNode(e, l, rchild, newcost) + } } - else { - val rchild = splitCoresetTree(r) - val newcost = l.cost + rchild.cost - CoresetTreeNode(e, l, rchild, newcost) + else + { + if(Random.nextDouble < l.cost/root.cost){ + val lchild = splitCoresetTree(l) + val newcost = lchild.cost + r.cost + CoresetTreeNode(e, lchild, r, newcost) + } else { + val rchild = splitCoresetTree(r) + val newcost = l.cost + rchild.cost + CoresetTreeNode(e, l, rchild, newcost) + } } } }