-
Notifications
You must be signed in to change notification settings - Fork 0
/
karger.scala
72 lines (65 loc) · 2.54 KB
/
karger.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
:load ../../../project/program/mst_low_runtime.scala
import scala.util.Random
import org.apache.spark.graphx.GraphOps
/* Function: max_weight
* --------------------
* simple wrapper to compute the maximal edge weight in a pair.
*/
def max_weight(e1: Edge[Int], e2: Edge[Int]): Edge[Int] = {
if (e1.attr < e2.attr)
return e2
return e1
}
/* Function: get_cut_size
* ----------------------
* wrapper function which determines if an edge crosses
* a cut. If it does, the function emits 1, else 0.
*/
def get_cut_size(edge: Edge[Int],
ccs: Map[VertexId, VertexId]): Int = {
if (ccs(edge.srcId) != ccs(edge.dstId))
return 1
return 0
}
/* Function: min_cut_karger
* ------------------------
* min_cut_approximation algorithm. Runs O(n^2 lg(n)) iterations
* of Karger's contraction algorithm. With each iteration,
* edges of the graph G are randomly weighted, and an MST is computed
* in O(m/b + n lg(n)) time. The largest weight edge is then removed,
* and this is equivalent to a series of contractions in Kargers
* algorithm, since we now have a cut. Connected components analysis
* is computed on the sub-graph to compute the partition of nodes,
* at which point the value of the sampled cut is determined.
*/
def min_cut_karger(G: Graph[Int, Int]): Double = {
val N = G.vertices.collect.length
G.partitionBy(PartitionStrategy.EdgePartition2D)
var min_cut: Double = Double.PositiveInfinity
for (i <- 1 to N*N*( (log(N)/log(2)).ceil.toInt) ) {
val arb_weights = G.edges.map{e =>
val weight = new Random;
Edge(e.srcId, e.dstId, weight.nextInt(Int.MaxValue))}
arb_weights.cache() // Used to save randomly generated weights.
val G_arb = Graph(G.vertices, arb_weights)
val (mst, cc) = findMST(G_arb)
val maxw = arb_weights.reduce(max_weight)
val G_contract = new GraphOps(Graph(G.vertices,
G.edges.filter{e =>
e.srcId != maxw.srcId &&
e.dstId != maxw.dstId}))
val cc_contract = G_contract.connectedComponents.
vertices.
map{v => v._1 -> v._2}.
collect.
toMap
val cc_bc = sc.broadcast(cc_contract)
val maxw_bc = sc.broadcast(maxw)
val tmp_cut = G.edges.
map{get_cut_size(_, cc_bc.value)}.
reduce(_ + _)
if (tmp_cut < min_cut)
min_cut = tmp_cut
}
return min_cut
}