Skip to content

Commit

Permalink
Update Example.java
Browse files Browse the repository at this point in the history
  • Loading branch information
jiayuasu committed Aug 18, 2016
1 parent 71c1c0f commit 51f136c
Showing 1 changed file with 4 additions and 104 deletions.
108 changes: 4 additions & 104 deletions src/main/java/org/datasyslab/geospark/showcase/Example.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ public static void main(String[] args) {
}
conf=new SparkConf().setAppName(queryName+"+"+inputLocation+"+"+gridType+"+"+cores+"+"+numPartitions).setMaster(masterName)
.set("spark.history.fs.logDirectory", "/home/ubuntu/sparkeventlog")
//.set("spark.history.fs.logDirectory", "/data1/jia/sparkeventlog")
.set("spark.history.retainedApplications", "10000")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", "/home/ubuntu/sparkeventlog")
//.set("spark.eventLog.dir", "/data1/jia/sparkeventlog")
.set("spark.executor.memory", "50g")
.set("spark.core.connection.ack.wait.timeout","900")
.set("spark.akka.timeout","900")
Expand All @@ -114,14 +112,11 @@ public static void main(String[] args) {
try {
switch(queryName)
{
case "formattransform":
formatTransform(args[0]);
break;
case "pointrange":
testSpatialRangeQuery();
break;
case "pointrangeindex":
testSpatialRangeQueryUsingIndex();
testSpatialRangeQueryUsingIndex();
break;
case "pointknn":
testSpatialKnnQuery();
Expand All @@ -135,9 +130,6 @@ public static void main(String[] args) {
case "pointjoinindex":
testSpatialJoinQueryUsingIndex();
break;
case "pointjoinspark":
testSpatialJoinQueryUsingSpark();
break;
default:
throw new Exception("Query type is not recognized, ");
}
Expand All @@ -148,33 +140,6 @@ public static void main(String[] args) {

TearDown();
}
public static void formatTransform(String outputname) throws Exception
{
PolygonRDD polygonRDD = new PolygonRDD(sc, inputLocation, offset, splitter);
JavaRDD<String> result=polygonRDD.rawPolygonRDD.map(new Function<Polygon,String>()
{
@Override
public String call(Polygon v1) throws Exception {
// TODO Auto-generated method stub
Envelope envelope=v1.getEnvelopeInternal();
String result="";
result=envelope.getMinX()+"\t"+envelope.getMinY()+"\t"+envelope.getMaxX()+"\t"+envelope.getMaxY();
return result;
}

});
FileWriter fw = new FileWriter(outputname+".new");
BufferedWriter bw = new BufferedWriter(fw);
PrintWriter out = new PrintWriter(bw);
List<String> output=result.collect();
for(String str:output)
{
out.println(str);
}
out.close();
bw.close();
fw.close();
}
public static void testSpatialRangeQuery() throws Exception {
Random random=new Random();
double randomNumber=random.nextInt(10)+random.nextDouble();
Expand All @@ -188,18 +153,7 @@ public static void testSpatialRangeQuery() throws Exception {
}
}

public static void testSpatialRangeQuery1() throws Exception {
Random random=new Random();
double randomNumber=random.nextInt(10)+random.nextDouble();
queryEnvelope=new Envelope (-90.01+randomNumber,-80.01+randomNumber,30.01+randomNumber,40.01+randomNumber);
PointRDD pointRDD = new PointRDD(sc, inputLocation, offset, splitter);
pointRDD.rawPointRDD.persist(StorageLevel.MEMORY_ONLY());
for(int i=0;i<loopTimes;i++)
{
long resultSize = RangeQuery.SpatialRangeQuery(pointRDD, queryEnvelope, 0).getRawPointRDD().count();
assert resultSize>-1;
}
}


public static void testSpatialRangeQueryUsingIndex() throws Exception {
Random random=new Random();
Expand Down Expand Up @@ -245,13 +199,7 @@ public static void testSpatialJoinQuery() throws Exception {
RectangleRDD rectangleRDD = new RectangleRDD(sc, inputLocation2, offset2, splitter2);
//polygonRDD.rawPolygonRDD.unpersist();
RectangleRDD objectRDD;
if(gridType=="voronoi"){
objectRDD = new RectangleRDD(sc, inputLocation, offset ,splitter,gridType,numPartitions2);
}
else
{
objectRDD = new RectangleRDD(sc, inputLocation, offset ,splitter,gridType,numPartitions);
}
objectRDD = new RectangleRDD(sc, inputLocation, offset ,splitter,gridType,numPartitions);
objectRDD.gridRectangleRDD.persist(StorageLevel.MEMORY_ONLY());
JoinQuery joinQuery = new JoinQuery(sc,objectRDD,rectangleRDD);
for(int i=0;i<loopTimes;i++)
Expand All @@ -266,13 +214,7 @@ public static void testSpatialJoinQueryUsingIndex() throws Exception {
RectangleRDD rectangleRDD = new RectangleRDD(sc, inputLocation2, offset2, splitter2);
//polygonRDD.rawPolygonRDD.unpersist();
RectangleRDD objectRDD;
if(gridType=="voronoi"){
objectRDD = new RectangleRDD(sc, inputLocation, offset ,splitter,gridType,numPartitions2);
}
else
{
objectRDD = new RectangleRDD(sc, inputLocation, offset ,splitter,gridType,numPartitions);
}
objectRDD = new RectangleRDD(sc, inputLocation, offset ,splitter,gridType,numPartitions);
objectRDD.buildIndex("rtree");
JoinQuery joinQuery = new JoinQuery(sc,objectRDD,rectangleRDD);
for(int i=0;i<loopTimes;i++)
Expand All @@ -283,48 +225,6 @@ public static void testSpatialJoinQueryUsingIndex() throws Exception {
}


public static void testSpatialJoinQueryUsingSpark() throws Exception {
RectangleRDD rectangleRDD = new RectangleRDD(sc, inputLocation2, offset2, splitter2);
//polygonRDD.rawPolygonRDD.unpersist();
RectangleRDD objectRDD = new RectangleRDD(sc, inputLocation, offset, splitter);
List<Envelope> queryWindows=objectRDD.rawRectangleRDD.collect();
final Broadcast<List<Envelope>> queryWindowsBroadcast=sc.broadcast(queryWindows);
for(int i=0;i<loopTimes;i++)
{

//JavaPairRDD<Envelope,Point> joinRDD=rectangleRDD.rawRectangleRDD.cartesian(pointRDD.rawPointRDD);
JavaPairRDD<Envelope,Envelope> filteredJoinRDD=objectRDD.rawRectangleRDD.flatMapToPair(new PairFlatMapFunction<Envelope,Envelope,Envelope>()
{

@Override
public Iterable<Tuple2<Envelope, Envelope>> call(Envelope t)
throws Exception {
// TODO Auto-generated method stub
List<Envelope> queryWindowSet=queryWindowsBroadcast.getValue();
Iterator<Envelope> queryWindowIterator=queryWindowSet.iterator();
final HashSet<Tuple2<Envelope, Envelope>> result=new HashSet<Tuple2<Envelope, Envelope>>();
while(queryWindowIterator.hasNext())
{
Envelope window=queryWindowIterator.next();
if(window.contains(t))
{
result.add(new Tuple2<Envelope, Envelope>(window,t));
}

}
return result;
}

});
queryWindowsBroadcast.destroy();
JavaPairRDD<Envelope,Iterable<Envelope>> resultRDD=filteredJoinRDD.groupByKey();
long resultSize=resultRDD.count();
assert resultSize>-1;
}
}



public static void TearDown() {
sc.stop();
}
Expand Down

0 comments on commit 51f136c

Please sign in to comment.