From 2ae41f48f32b800f5dab6d589ebf8fad0e2fb973 Mon Sep 17 00:00:00 2001 From: u5482878 Date: Thu, 19 Apr 2018 19:14:07 +1000 Subject: [PATCH 1/6] Using one call-back function with a DPE flag for notifying collective completion status Removed failed_coll_handler, and added a boolean flag to coll_handler to notify process failure errors that leads to throwing a DeadPlaceException in Team. --- x10.runtime/src-cpp/x10aux/network.cc | 9 +++------ x10.runtime/src-cpp/x10aux/network.h | 4 +--- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/x10.runtime/src-cpp/x10aux/network.cc b/x10.runtime/src-cpp/x10aux/network.cc index b97bc01f3b..ec4486a84c 100644 --- a/x10.runtime/src-cpp/x10aux/network.cc +++ b/x10.runtime/src-cpp/x10aux/network.cc @@ -460,13 +460,10 @@ void *x10aux::coll_enter() { return fs; } -void x10aux::coll_handler(void *arg) { - x10::xrx::FinishState* fs = (x10::xrx::FinishState*)arg; - fs->notifyActivityTermination(); -} -//used with ULFM, called only when a collective has failed due to a process failure -void x10aux::failed_coll_handler(void *arg) { +void x10aux::coll_handler(void *arg, bool throwDPE) { x10::xrx::FinishState* fs = (x10::xrx::FinishState*)arg; + if (throwDPE) //triggered only by ULFM when native MPI collectives fail + fs->pushException(x10::lang::DeadPlaceException::_make(x10::lang::String::Lit("[Native] Team contains at least one dead member"))); fs->notifyActivityTermination(); } diff --git a/x10.runtime/src-cpp/x10aux/network.h b/x10.runtime/src-cpp/x10aux/network.h index 4e8aad5b98..1c5f164366 100644 --- a/x10.runtime/src-cpp/x10aux/network.h +++ b/x10.runtime/src-cpp/x10aux/network.h @@ -199,12 +199,10 @@ namespace x10aux { // teams void *coll_enter(); - void coll_handler(void *arg); + void coll_handler(void *arg, bool throwDPE); void *coll_enter2(void *arg); void coll_handler2(x10rt_team t, void *arg); - void failed_coll_handler(void *arg); - void register_place_removed_handler(x10::lang::VoidFun_0_1* body_fun); void notify_place_death(unsigned int place); } From c33fceac0eb6de2e8f8b293a2d0033b18986ae4e Mon Sep 17 00:00:00 2001 From: u5482878 Date: Thu, 19 Apr 2018 19:17:02 +1000 Subject: [PATCH 2/6] All low-level collective operations return void as they did before the first integration with ULFM Also use one call-back function for collective completion notification --- .../src-java/x10/x10rt/TeamSupport.java | 16 +++--- x10.runtime/x10rt/common/x10rt_emu_coll.cc | 29 +++++------ x10.runtime/x10rt/common/x10rt_front.cc | 27 +++++----- x10.runtime/x10rt/common/x10rt_internal.h | 3 -- x10.runtime/x10rt/common/x10rt_logical.cc | 49 +++++++------------ x10.runtime/x10rt/include/x10rt_front.h | 17 +++---- x10.runtime/x10rt/include/x10rt_logical.h | 15 ++---- x10.runtime/x10rt/include/x10rt_net.h | 15 ++---- x10.runtime/x10rt/include/x10rt_types.h | 2 +- x10.runtime/x10rt/jni/jni_team.cc | 19 ++++--- x10.runtime/x10rt/pami/x10rt_pami.cc | 25 ++++------ x10.runtime/x10rt/sockets/x10rt_sockets.cc | 22 +++------ .../x10rt/standalone/x10rt_standalone.cc | 15 ++---- x10.runtime/x10rt/test/x10rt_coll.cc | 7 ++- 14 files changed, 99 insertions(+), 162 deletions(-) diff --git a/x10.runtime/src-java/x10/x10rt/TeamSupport.java b/x10.runtime/src-java/x10/x10rt/TeamSupport.java index 6decf1a09c..6f66e61867 100644 --- a/x10.runtime/src-java/x10/x10rt/TeamSupport.java +++ b/x10.runtime/src-java/x10/x10rt/TeamSupport.java @@ -140,9 +140,8 @@ public static void nativeScatter(int id, int role, int root, Rail src, int sr } } - public static boolean nativeBcast(int id, int role, int root, Rail src, int src_off, + public static void nativeBcast(int id, int role, int root, Rail src, int src_off, Rail dst, int dst_off, int count) { - boolean success = true; if (!X10RT.forceSinglePlace) { int typeCode = getTypeCode(src); assert getTypeCode(dst) == typeCode : "Incompatible src and dst arrays"; @@ -153,12 +152,11 @@ public static boolean nativeBcast(int id, int role, int root, Rail src, int s FinishState fs = ActivityManagement.activityCreationBookkeeping(); try { - success =nativeBcastImpl(id, role, root, srcRaw, src_off, dstRaw, dst_off, count, typeCode, fs); + nativeBcastImpl(id, role, root, srcRaw, src_off, dstRaw, dst_off, count, typeCode, fs); } catch (UnsatisfiedLinkError e) { aboutToDie("nativeBcast"); } } - return success; } public static void nativeAllToAll(int id, int role, Rail src, int src_off, @@ -199,9 +197,8 @@ public static void nativeReduce(int id, int role, int root, Rail src, int src } } - public static boolean nativeAllReduce(int id, int role, Rail src, int src_off, + public static void nativeAllReduce(int id, int role, Rail src, int src_off, Rail dst, int dst_off, int count, int op) { - boolean success = true; if (!X10RT.forceSinglePlace) { int typeCode = getTypeCode(src); Object srcRaw = typeCode == RED_TYPE_COMPLEX ? copyComplexToNewDouble(src, src_off, count) : src.getBackingArray(); @@ -212,12 +209,11 @@ public static boolean nativeAllReduce(int id, int role, Rail src, int src_off FinishState fs = ActivityManagement.activityCreationBookkeeping(); try { - success = nativeAllReduceImpl(id, role, srcRaw, src_off, dstRaw, dst_off, count, op, typeCode, fs); + nativeAllReduceImpl(id, role, srcRaw, src_off, dstRaw, dst_off, count, op, typeCode, fs); } catch (UnsatisfiedLinkError e) { aboutToDie("nativeAllReduce"); } } - return success; } public static void nativeIndexOfMax(int id, int role, Rail src, @@ -296,7 +292,7 @@ private static native void nativeScatterImpl(int id, int role, int root, Object Object dstRaw, int dst_off, int count, int typecode, FinishState fs); - private static native Boolean nativeBcastImpl(int id, int role, int root, Object srcRaw, int src_off, + private static native void nativeBcastImpl(int id, int role, int root, Object srcRaw, int src_off, Object dstRaw, int dst_off, int count, int typecode, FinishState fs); @@ -308,7 +304,7 @@ private static native void nativeReduceImpl(int id, int role, int root, Object s Object dstRaw, int dst_off, int count, int op, int typecode, FinishState fs); - private static native Boolean nativeAllReduceImpl(int id, int role, Object srcRaw, int src_off, + private static native void nativeAllReduceImpl(int id, int role, Object srcRaw, int src_off, Object dstRaw, int dst_off, int count, int op, int typecode, FinishState fs); diff --git a/x10.runtime/x10rt/common/x10rt_emu_coll.cc b/x10.runtime/x10rt/common/x10rt_emu_coll.cc index b131c55ee6..01f4659598 100644 --- a/x10.runtime/x10rt/common/x10rt_emu_coll.cc +++ b/x10.runtime/x10rt/common/x10rt_emu_coll.cc @@ -406,7 +406,7 @@ void x10rt_emu_team_del (x10rt_team team, x10rt_place role, x10rt_completion_han { assert(gtdb[team]->placev[role] == x10rt_net_here()); gtdb.releaseTeam(team); - ch(arg); + ch(arg, false); } namespace { @@ -519,7 +519,7 @@ static void reduce_c_to_p_update_recv (const x10rt_msg_params *p) } m.reduce.rbuf = recv; if (m.reduce.started) { - m.reduce.ch(m.reduce.arg); + m.reduce.ch(m.reduce.arg, false); } //fprintf(stderr, "%d: Decrementing child from %d to %d\n", (int)role, (int) m.barrier.wait, (int) m.barrier.wait-1); m.barrier.childToReceive--; @@ -672,7 +672,7 @@ bool CollOp::progress (void) } safe_free(this); m.bcast.count = 0; // bcast completed - m.barrier.ch(m.barrier.arg); + m.barrier.ch(m.barrier.arg, false); return true; } } @@ -684,12 +684,12 @@ void CollOp::handlePendingReduce(MemberObj *m) { if (m->reduce.count > 0) { SYNCHRONIZED (global_lock); if (m->reduce.rbuf != NULL) { - m->reduce.ch(m->reduce.arg); + m->reduce.ch(m->reduce.arg, false); } if (m->reduce.rbuf2 != NULL) { m->reduce.rbuf = m->reduce.rbuf2; m->reduce.rbuf2 = NULL; - m->reduce.ch(m->reduce.arg); + m->reduce.ch(m->reduce.arg, false); } } } @@ -733,7 +733,7 @@ static void scatter_copy_recv (const x10rt_msg_params *p) m.scatter.data_done = true; if (m.scatter.barrier_done && m.scatter.ch != NULL) { PREEMPT (global_lock); - m.scatter.ch(m.scatter.arg); + m.scatter.ch(m.scatter.arg, false); } } @@ -744,7 +744,7 @@ namespace { }; } -static void scatter_after_barrier (void *arg) +static void scatter_after_barrier (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); TeamObj &t = *gtdb[m.team]; @@ -764,7 +764,7 @@ static void scatter_after_barrier (void *arg) m2->scatter.data_done = true; if (m2->scatter.barrier_done && m2->scatter.ch != NULL) { PREEMPT (global_lock); - m2->scatter.ch(m2->scatter.arg); + m2->scatter.ch(m2->scatter.arg, false); } } else { // serialise all the data @@ -782,7 +782,7 @@ static void scatter_after_barrier (void *arg) // the barrier must have completed or we wouldn't even be here // signal completion to root role if (m.scatter.ch != NULL) { - m.scatter.ch(m.scatter.arg); + m.scatter.ch(m.scatter.arg, false); } } else { @@ -792,7 +792,7 @@ static void scatter_after_barrier (void *arg) m.scatter.barrier_done = true; if (m.scatter.data_done && m.scatter.ch != NULL) { PREEMPT (global_lock); - m.scatter.ch(m.scatter.arg); + m.scatter.ch(m.scatter.arg, false); } } } @@ -835,7 +835,6 @@ bool x10rt_emu_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); //not used by Team.x10 @@ -852,7 +851,6 @@ void x10rt_emu_gather (x10rt_team team, x10rt_place role, bool x10rt_emu_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); //not used by Team.x10 @@ -861,7 +859,6 @@ bool x10rt_emu_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, bool x10rt_emu_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { TeamObj &t = *gtdb[team]; @@ -885,7 +882,7 @@ bool x10rt_emu_bcast (x10rt_team team, x10rt_place role, } -static void alltoall_intermediate (void *arg) +static void alltoall_intermediate (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); @@ -1076,7 +1073,7 @@ namespace { }; template - void reduce3 (void *arg) + void reduce3 (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); @@ -1234,7 +1231,7 @@ static void receive_new_team (x10rt_team new_team, void *arg) safe_free(m.split.newTeamPlaces); } -static void split (void *arg) +static void split (void *arg, bool dummy) { MemberObj &m = *(static_cast(arg)); TeamObj &t = *gtdb[m.team]; diff --git a/x10.runtime/x10rt/common/x10rt_front.cc b/x10.runtime/x10rt/common/x10rt_front.cc index f951e95e99..06c3e403d8 100644 --- a/x10.runtime/x10rt/common/x10rt_front.cc +++ b/x10.runtime/x10rt/common/x10rt_front.cc @@ -210,13 +210,12 @@ void x10rt_barrier (x10rt_team team, x10rt_place role, x10rt_lgl_barrier(team, role, ch, arg); } -bool x10rt_bcast (x10rt_team team, x10rt_place role, +void x10rt_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_bcast(team, role, root, sbuf, dbuf, el, count, errch, ch, arg); + x10rt_lgl_bcast(team, role, root, sbuf, dbuf, el, count, ch, arg); } void x10rt_scatter (x10rt_team team, x10rt_place role, @@ -227,14 +226,13 @@ void x10rt_scatter (x10rt_team team, x10rt_place role, x10rt_lgl_scatter(team, role, root, sbuf, dbuf, el, count, ch, arg); } -bool x10rt_scatterv (x10rt_team team, x10rt_place role, +void x10rt_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, errch, ch, arg); + x10rt_lgl_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, ch, arg); } @@ -246,13 +244,12 @@ void x10rt_gather (x10rt_team team, x10rt_place role, x10rt_lgl_gather (team, role, root, sbuf, dbuf, el, count, ch, arg); } -bool x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +void x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_gatherv (team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, errch, ch, arg); + x10rt_lgl_gatherv (team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, ch, arg); } void x10rt_alltoall (x10rt_team team, x10rt_place role, @@ -273,26 +270,24 @@ void x10rt_reduce (x10rt_team team, x10rt_place role, x10rt_lgl_reduce(team, role, root, sbuf, dbuf, op, dtype, count, ch, arg); } -bool x10rt_allreduce (x10rt_team team, x10rt_place role, +void x10rt_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_allreduce(team, role, sbuf, dbuf, op, dtype, count, errch, ch, arg); + x10rt_lgl_allreduce(team, role, sbuf, dbuf, op, dtype, count, ch, arg); } -bool x10rt_agree (x10rt_team team, x10rt_place role, +void x10rt_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - return x10rt_lgl_agree(team, role, sbuf, dbuf, errch, ch, arg); + x10rt_lgl_agree(team, role, sbuf, dbuf, ch, arg); } -void x10rt_one_setter (void *arg) +void x10rt_one_setter (void *arg, bool dummy) { *((int*)arg) = 1; } void x10rt_team_setter (x10rt_team v, void *arg) diff --git a/x10.runtime/x10rt/common/x10rt_internal.h b/x10.runtime/x10rt/common/x10rt_internal.h index 208e1e7495..90313fa3fa 100644 --- a/x10.runtime/x10rt/common/x10rt_internal.h +++ b/x10.runtime/x10rt/common/x10rt_internal.h @@ -151,7 +151,6 @@ X10RT_C void x10rt_emu_barrier (x10rt_team team, x10rt_place role, X10RT_C bool x10rt_emu_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); X10RT_C void x10rt_emu_scatter (x10rt_team team, x10rt_place role, @@ -163,7 +162,6 @@ X10RT_C bool x10rt_emu_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); X10RT_C void x10rt_emu_gather (x10rt_team team, x10rt_place role, @@ -175,7 +173,6 @@ X10RT_C bool x10rt_emu_gatherv (x10rt_team team, x10rt_place role, x10rt_place r const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); X10RT_C void x10rt_emu_alltoall (x10rt_team team, x10rt_place role, diff --git a/x10.runtime/x10rt/common/x10rt_logical.cc b/x10.runtime/x10rt/common/x10rt_logical.cc index 6504321484..6d0ab66277 100644 --- a/x10.runtime/x10rt/common/x10rt_logical.cc +++ b/x10.runtime/x10rt/common/x10rt_logical.cc @@ -101,7 +101,7 @@ static x10rt_error fatal (const char *format, ...) x10rt_stats x10rt_lgl_stats; -static void one_setter (void *arg) +static void one_setter (void *arg, bool throwDPE) { *((int*)arg) = 1; } const char *x10rt_lgl_error_msg (void) { @@ -1069,19 +1069,17 @@ void x10rt_lgl_barrier (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_bcast (x10rt_team team, x10rt_place role, +void x10rt_lgl_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_bcast(team, role, root, sbuf, dbuf, el, count, errch, ch, arg); + x10rt_net_bcast(team, role, root, sbuf, dbuf, el, count, ch, arg); } else { - x10rt_emu_bcast(team, role, root, sbuf, dbuf, el, count, errch, ch, arg); + x10rt_emu_bcast(team, role, root, sbuf, dbuf, el, count, ch, arg); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_bcast is not used in Team.x10 } } @@ -1099,33 +1097,28 @@ void x10rt_lgl_scatter (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, +void x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, errch, ch, arg); + x10rt_net_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, ch, arg); } else { - x10rt_emu_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, errch, ch, arg); + x10rt_emu_scatterv(team, role, root, sbuf, soffsets, scounts, dbuf, dcount, el, ch, arg); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_scatterv is not used in Team.x10 } } -bool x10rt_lgl_agree (x10rt_team team, x10rt_place role, +void x10rt_lgl_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (x10rt_lgl_agreement_support()) { - return x10rt_net_agree(team, role, sbuf, dbuf, errch, ch, arg); - } else { - return false; + x10rt_net_agree(team, role, sbuf, dbuf, ch, arg); } } @@ -1143,20 +1136,18 @@ void x10rt_lgl_gather (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +void x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, errch, ch, arg); + x10rt_net_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, ch, arg); } else { - x10rt_emu_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, errch, ch, arg); + x10rt_emu_gatherv(team, role, root, sbuf, scount, dbuf, doffsets, dcounts, el, ch, arg); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_gatherv is not used by Team.x10 } } @@ -1190,20 +1181,18 @@ void x10rt_lgl_reduce (x10rt_team team, x10rt_place role, } } -bool x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, +void x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { - ESCAPE_IF_ERR_BOOL; + ESCAPE_IF_ERR; if (has_collectives >= X10RT_COLL_ALLBLOCKINGCOLLECTIVES) { - return x10rt_net_allreduce(team, role, sbuf, dbuf, op, dtype, count, errch, ch, arg); + x10rt_net_allreduce(team, role, sbuf, dbuf, op, dtype, count, ch, arg); } else { x10rt_emu_reduce(team, role, 0, sbuf, dbuf, op, dtype, count, ch, arg, true); while (x10rt_emu_coll_probe()); - return true; //TODO: should not always return true, but x10rt_emu_reduce is not used } } diff --git a/x10.runtime/x10rt/include/x10rt_front.h b/x10.runtime/x10rt/include/x10rt_front.h index 0800e40489..f9f470e440 100644 --- a/x10.runtime/x10rt/include/x10rt_front.h +++ b/x10.runtime/x10rt/include/x10rt_front.h @@ -733,10 +733,9 @@ X10RT_C void x10rt_barrier (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_bcast (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** Asynchronously blocks until all members have received their part of root's array. Note that @@ -796,11 +795,10 @@ X10RT_C void x10rt_scatter (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_scatterv (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); @@ -858,11 +856,10 @@ X10RT_C void x10rt_gather (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +X10RT_C void x10rt_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** Asynchronously blocks until all members have received their portion of data from each @@ -948,12 +945,11 @@ X10RT_C void x10rt_reduce (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_allreduce (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); @@ -971,15 +967,14 @@ X10RT_C bool x10rt_allreduce (x10rt_team team, x10rt_place role, * * \param arg User pointer that is passed to the completion handler */ -X10RT_C bool x10rt_agree (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** Sets arg to 1. * \param arg Assumed to be an int* */ -X10RT_C void x10rt_one_setter (void *arg); +X10RT_C void x10rt_one_setter (void *arg, bool throwDPE); /** Sets arg to the given team. * \param v The new team is passed in here diff --git a/x10.runtime/x10rt/include/x10rt_logical.h b/x10.runtime/x10rt/include/x10rt_logical.h index d84a61c1d5..c85c3410e7 100644 --- a/x10.runtime/x10rt/include/x10rt_logical.h +++ b/x10.runtime/x10rt/include/x10rt_logical.h @@ -398,10 +398,9 @@ X10RT_C void x10rt_lgl_barrier (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_bcast * \param arg As in #x10rt_bcast */ -X10RT_C bool x10rt_lgl_bcast (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_scatter @@ -433,12 +432,11 @@ X10RT_C void x10rt_lgl_scatter (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_scatterv * \param arg As in #x10rt_scatterv */ -X10RT_C bool x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_gather @@ -470,11 +468,10 @@ X10RT_C void x10rt_lgl_gather (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_gatherv * \param arg As in #x10rt_gatherv */ -X10RT_C bool x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +X10RT_C void x10rt_lgl_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_alltoall @@ -521,12 +518,11 @@ X10RT_C void x10rt_lgl_reduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_allreduce * \param arg As in #x10rt_allreduce */ -X10RT_C bool x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_agree @@ -537,9 +533,8 @@ X10RT_C bool x10rt_lgl_allreduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_allreduce * \param arg As in #x10rt_allreduce */ -X10RT_C bool x10rt_lgl_agree (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_lgl_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); #endif diff --git a/x10.runtime/x10rt/include/x10rt_net.h b/x10.runtime/x10rt/include/x10rt_net.h index feb6662212..15f5c7df41 100644 --- a/x10.runtime/x10rt/include/x10rt_net.h +++ b/x10.runtime/x10rt/include/x10rt_net.h @@ -245,10 +245,9 @@ X10RT_C void x10rt_net_barrier (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_bcast * \param arg As in #x10rt_lgl_bcast */ -X10RT_C bool x10rt_net_bcast (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_lgl_scatter @@ -280,11 +279,10 @@ X10RT_C void x10rt_net_scatter (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_scatterv * \param arg As in #x10rt_lgl_scatterv */ -X10RT_C bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_lgl_gather * \param team As in #x10rt_lgl_gather @@ -316,10 +314,9 @@ X10RT_C void x10rt_net_gather (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_gatherv * \param arg As in #x10rt_lgl_gatherv */ -X10RT_C bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, +X10RT_C void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); /** \see #x10rt_lgl_alltoall @@ -366,12 +363,11 @@ X10RT_C void x10rt_net_reduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_lgl_allreduce * \param arg As in #x10rt_lgl_allreduce */ -X10RT_C bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); @@ -383,9 +379,8 @@ X10RT_C bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, * \param ch As in #x10rt_allreduce * \param arg As in #x10rt_allreduce */ -X10RT_C bool x10rt_net_agree (x10rt_team team, x10rt_place role, +X10RT_C void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg); diff --git a/x10.runtime/x10rt/include/x10rt_types.h b/x10.runtime/x10rt/include/x10rt_types.h index 6862da90f1..00df61ba0a 100644 --- a/x10.runtime/x10rt/include/x10rt_types.h +++ b/x10.runtime/x10rt/include/x10rt_types.h @@ -43,7 +43,7 @@ typedef uint32_t x10rt_team; /** User callback to signal that non-blocking operations have completed. */ -typedef void x10rt_completion_handler (void *arg); +typedef void x10rt_completion_handler (void *arg, bool throwDPE); /** User callback to signal that non-blocking team construction operations have completed. */ diff --git a/x10.runtime/x10rt/jni/jni_team.cc b/x10.runtime/x10rt/jni/jni_team.cc index 3c4938efd0..7071266aa0 100644 --- a/x10.runtime/x10rt/jni/jni_team.cc +++ b/x10.runtime/x10rt/jni/jni_team.cc @@ -111,7 +111,7 @@ typedef struct finishOnlyStruct { jobject globalFinishState; } finishOnlyStruct; -static void finishOnlyCallback(void *arg) { +static void finishOnlyCallback(void *arg, bool throwDPE) { finishOnlyStruct* callbackArg = (finishOnlyStruct*)arg; JNIEnv *env = jniHelper_getEnv(); @@ -166,7 +166,7 @@ typedef struct postCopyStruct { void *dstData; } postCopyStruct; -static void postCopyCallback(void *arg) { +static void postCopyCallback(void *arg, bool dummy) { postCopyStruct *callbackArg = (postCopyStruct*)arg; JNIEnv *env = jniHelper_getEnv(); @@ -429,7 +429,7 @@ JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeScatterImpl(JNIEnv *env, * Method: nativeBcastImpl * Signature: (IIILjava/lang/Object;ILjava/lang/Object;IIILx10/lang/FinishState;)V */ -JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeBcastImpl(JNIEnv *env, jclass klazz, +JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeBcastImpl(JNIEnv *env, jclass klazz, jint id, jint role, jint root, jobject src, jint src_off, jobject dst, jint dst_off, @@ -578,7 +578,7 @@ JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeBcastImpl(JNIEnv *env callbackArg->srcData = srcData; callbackArg->dstData = dstData; - return (jobject)x10rt_bcast(id, role, root, srcData, dstData, el, count, &postCopyCallback, &postCopyCallback, callbackArg); + x10rt_bcast(id, role, root, srcData, dstData, el, count, &postCopyCallback, callbackArg); } @@ -854,7 +854,7 @@ JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeReduceImpl(JNIEnv *env, * Method: nativeAllReduceImpl * Signature: (IILjava/lang/Object;ILjava/lang/Object;IIIILx10/lang/FinishState;)V */ -JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeAllReduceImpl(JNIEnv *env, jclass klazz, +JNIEXPORT void JNICALL Java_x10_x10rt_TeamSupport_nativeAllReduceImpl(JNIEnv *env, jclass klazz, jint id, jint role, jobject src, jint src_off, jobject dst, jint dst_off, @@ -954,9 +954,8 @@ JNIEXPORT jobject JNICALL Java_x10_x10rt_TeamSupport_nativeAllReduceImpl(JNIEnv callbackArg->srcData = srcData; callbackArg->dstData = dstData; - //FIXME: how to call the correct failure call back? - return (jobject)x10rt_allreduce(id, role, srcData, dstData, (x10rt_red_op_type)op, (x10rt_red_type)typecode, - count, &postCopyCallback, &postCopyCallback, callbackArg); + x10rt_allreduce(id, role, srcData, dstData, (x10rt_red_op_type)op, (x10rt_red_type)typecode, + count, &postCopyCallback, callbackArg); } @@ -976,7 +975,7 @@ typedef struct minmaxStruct { DoubleIdx *dstData; } minmaxStruct; -static void minmaxCallback(void *arg) { +static void minmaxCallback(void *arg, bool dummy) { minmaxStruct *callbackArg = (minmaxStruct*)arg; JNIEnv *env = jniHelper_getEnv(); @@ -1024,7 +1023,7 @@ static void indexOfImpl(JNIEnv *env, jint id, jint role, callbackArg->srcData = srcData; callbackArg->dstData = dstData; - x10rt_allreduce(id, role, srcData, dstData, op, X10RT_RED_TYPE_DBL_S32, 1, &minmaxCallback, &minmaxCallback, callbackArg); + x10rt_allreduce(id, role, srcData, dstData, op, X10RT_RED_TYPE_DBL_S32, 1, &minmaxCallback, callbackArg); } diff --git a/x10.runtime/x10rt/pami/x10rt_pami.cc b/x10.runtime/x10rt/pami/x10rt_pami.cc index 8ec2b6b401..f478a88501 100644 --- a/x10.runtime/x10rt/pami/x10rt_pami.cc +++ b/x10.runtime/x10rt/pami/x10rt_pami.cc @@ -1748,7 +1748,7 @@ static void collective_operation_complete (pami_context_t context, #ifdef DEBUG fprintf(stderr, "Place %u completed collective operation. cookie=%p\n", state.myPlaceId, cookie); #endif - cbd->tcb(cbd->arg); + cbd->tcb(cbd->arg, false); free(cbd->counts); free(cbd->offsets); free(cbd); @@ -1845,8 +1845,8 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, x10rt_completion_hand if (status != PAMI_SUCCESS) error("Unable to post a barrier on team %u", team); } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, - void *dbuf, size_t el, size_t count, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, + void *dbuf, size_t el, size_t count, x10rt_completion_handler *ch, void *arg) { #ifdef DEBUG fprintf(stderr, "Place %u executing broadcast of %lu %lu-byte elements on team %u, with role=%u, root=%u\n", state.myPlaceId, count, el, team, role, root); @@ -1886,7 +1886,6 @@ bool x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const // copy the data for the root separately. PAMI does not do this for us. if (role == root) memcpy(dbuf, sbuf, count*el); - return true; //PAMI not resilient } void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -1928,9 +1927,9 @@ void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, con // The local copy is not needed. PAMI handles this for us. } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, - void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) + void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *ch, void *arg) { x10rt_pami_team_callback *tcb = (x10rt_pami_team_callback *)x10rt_malloc(sizeof(x10rt_pami_team_callback)); if (tcb == NULL) error("Unable to allocate memory for a scatterv callback header"); @@ -1979,7 +1978,6 @@ bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, PAMI_Context_unlock(state.context); #endif if (status != PAMI_SUCCESS) error("Unable to post a scatterv on team %u", team); - return true; //PAMI is not resilient } void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -2019,9 +2017,8 @@ void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, cons if (status != PAMI_SUCCESS) error("Unable to post a gather on team %u", team); } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { x10rt_pami_team_callback *tcb = (x10rt_pami_team_callback *)x10rt_malloc(sizeof(x10rt_pami_team_callback)); @@ -2071,7 +2068,6 @@ bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, con PAMI_Context_unlock(state.context); #endif if (status != PAMI_SUCCESS) error("Unable to post a gatherv on team %u", team); - return true; //PAMI is not resilent } void x10rt_net_alltoall (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, @@ -2209,8 +2205,8 @@ void x10rt_net_reduce (x10rt_team team, x10rt_place role, if (status != PAMI_SUCCESS) error("Unable to post a reduce on team %u", team); } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, - x10rt_red_op_type op, x10rt_red_type dtype, size_t count, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, + x10rt_red_op_type op, x10rt_red_type dtype, size_t count, x10rt_completion_handler *ch, void *arg) { // Issue the collective x10rt_pami_team_callback *tcb = (x10rt_pami_team_callback *)x10rt_malloc(sizeof(x10rt_pami_team_callback)); @@ -2253,16 +2249,13 @@ bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, v PAMI_Context_unlock(state.context); #endif if (status != PAMI_SUCCESS) error("Unable to post an allreduce on team %u", team); - return true; //PAMI is not resilient } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { error("x10rt_net_agree not implemented"); - return false; } /* Registering a place removed callback, never used in this implementation */ diff --git a/x10.runtime/x10rt/sockets/x10rt_sockets.cc b/x10.runtime/x10rt/sockets/x10rt_sockets.cc index 801ed95c72..44239d935e 100644 --- a/x10.runtime/x10rt/sockets/x10rt_sockets.cc +++ b/x10.runtime/x10rt/sockets/x10rt_sockets.cc @@ -1618,13 +1618,11 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, x10rt_completion_hand fatal_error("x10rt_net_barrier not implemented"); } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_bcast not implemented"); - return false; } void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -1633,11 +1631,10 @@ void x10rt_net_scatter (x10rt_team team, x10rt_place role, x10rt_place root, con fatal_error("x10rt_net_scatter not implemented"); } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, - void *dbuf, size_t dcount, size_t el,x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, + void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_scatterv not implemented"); - return false; } void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, @@ -1646,13 +1643,11 @@ void x10rt_net_gather (x10rt_team team, x10rt_place role, x10rt_place root, cons fatal_error("x10rt_net_gather not implemented"); } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_gatherv not implemented"); - return false; } void x10rt_net_alltoall (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, @@ -1671,20 +1666,17 @@ void x10rt_net_reduce (x10rt_team team, x10rt_place role, fatal_error("x10rt_net_reduce not implemented"); } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, - x10rt_red_op_type op, x10rt_red_type dtype, size_t count,x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, + x10rt_red_op_type op, x10rt_red_type dtype, size_t count, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_allreduce not implemented"); - return false; } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { fatal_error("x10rt_net_agree not implemented"); - return false; } const char *x10rt_net_error_msg (void) { return context.errorMsg; } diff --git a/x10.runtime/x10rt/standalone/x10rt_standalone.cc b/x10.runtime/x10rt/standalone/x10rt_standalone.cc index e87e12c194..ab7700a1bf 100644 --- a/x10.runtime/x10rt/standalone/x10rt_standalone.cc +++ b/x10.runtime/x10rt/standalone/x10rt_standalone.cc @@ -976,10 +976,9 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); @@ -993,11 +992,10 @@ void x10rt_net_scatter (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, void *dbuf, size_t dcount, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); @@ -1011,11 +1009,10 @@ void x10rt_net_gather (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); @@ -1039,20 +1036,18 @@ void x10rt_net_reduce (x10rt_team team, x10rt_place role, abort(); } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { abort(); diff --git a/x10.runtime/x10rt/test/x10rt_coll.cc b/x10.runtime/x10rt/test/x10rt_coll.cc index 97326bcbbb..6d4a6d8b72 100644 --- a/x10.runtime/x10rt/test/x10rt_coll.cc +++ b/x10.runtime/x10rt/test/x10rt_coll.cc @@ -145,7 +145,7 @@ static void coll_test (x10rt_team team, x10rt_place role, x10rt_place per_place) << " correctness (if no warnings follow then OK)..." << std::endl; finished = 0; x10rt_bcast(team, role, root, root==role ? sbuf : NULL, dbuf, - el, count, x10rt_one_setter, x10rt_one_setter, &finished); + el, count, x10rt_one_setter, &finished); while (!finished) { x10rt_aborting_probe(); } for (size_t i=0 ; i Date: Thu, 19 Apr 2018 19:18:20 +1000 Subject: [PATCH 3/6] supporting ULFM2, all low-level collectives return void, and use one call-back function for reporting collective completion --- x10.runtime/x10rt/mpi/x10rt_mpi.cc | 265 ++++++++++++++++------------- 1 file changed, 144 insertions(+), 121 deletions(-) diff --git a/x10.runtime/x10rt/mpi/x10rt_mpi.cc b/x10.runtime/x10rt/mpi/x10rt_mpi.cc index de77d54bdc..40c2032cc3 100644 --- a/x10.runtime/x10rt/mpi/x10rt_mpi.cc +++ b/x10.runtime/x10rt/mpi/x10rt_mpi.cc @@ -7,6 +7,7 @@ * http://www.opensource.org/licenses/eclipse-1.0.php * * (C) Copyright IBM Corporation 2006-2016. + * (C) Copyright Sara Salem Hamouda 2018. */ /* MPICH2 mpi.h wants to not have SEEK_SET etc defined for C++ bindings */ @@ -49,9 +50,15 @@ #if MPI_VERSION >= 3 || (defined(OPEN_MPI) && ( OMPI_MAJOR_VERSION >= 2 || (OMPI_MAJOR_VERSION == 1 && OMPI_MINOR_VERSION >= 8))) || (defined(MVAPICH2_NUMVERSION) && MVAPICH2_NUMVERSION == 10900002) #define X10RT_NONBLOCKING_SUPPORTED true +#ifdef OPEN_MPI_ULFM +#define ULFM2 true +#endif //#define X10RT_MPI3_RMA true // performance hasn't been shown to be better than active messages, so disabled by default. Uncomment this line to use RDMA for PUT & GET #else #define X10RT_NONBLOCKING_SUPPORTED false +#ifdef OPEN_MPI_ULFM +#define ULFM1 true +#endif #endif #define X10RT_NET_DEBUG(fmt, ...) do { \ @@ -156,13 +163,21 @@ static inline void release_lock(pthread_mutex_t * lock) { release_lock(&global_state.lock); \ } + #ifdef OPEN_MPI_ULFM void mpiErrorHandler(MPI_Comm * comm, int *errorCode, ...); +#endif + static inline int is_process_failure_error(int mpi_error){ +#ifdef OPEN_MPI_ULFM return mpi_error == MPI_ERR_PROC_FAILED || + mpi_error == MPI_ERR_PROC_FAILED_PENDING || mpi_error == MPI_ERR_REVOKED; -} +#else + return false; #endif +} + /** * Each X10RT API call is broken down into * a X10RT request. Each request of either @@ -543,7 +558,7 @@ x10rt_error x10rt_net_init(int *argc, char ** *argv, x10rt_msg_type *counter) { } else { char *thread_serialized = getenv(X10RT_MPI_THREAD_SERIALIZED); #ifdef OPEN_MPI_ULFM - thread_serialized = "1"; //ULFM does not support MPI_THREAD_MULTIPLE + thread_serialized = "1"; //ULFM1 does not support MPI_THREAD_MULTIPLE #endif int level_required; int level_provided; @@ -708,18 +723,23 @@ x10rt_place x10rt_net_ndead (void) { bool x10rt_net_is_place_dead (x10rt_place p) { #ifdef OPEN_MPI_ULFM - if (p >= global_state.nprocs) return true; - bool found = false; - get_lock(&global_state.lock); - //deadPlaces is not sorted, can't use binary search - for (int i=0; i 0){ + if (p >= global_state.nprocs) return true; + bool found = false; + get_lock(&global_state.lock); + //deadPlaces is not sorted, can't use binary search + for (int i=0; i(ch_); void *arg = reinterpret_cast(arg_); - ch(arg); + ch(arg, false); free(cont); } @@ -2642,7 +2647,7 @@ static void x10rt_net_team_barrier_for_blocking (x10rt_place placec, x10rt_place x10rt_serbuf_free(&b); */ - ch(arg); + ch(arg, false); X10RT_NET_DEBUG("%s", "finished"); } @@ -2697,7 +2702,7 @@ void x10rt_net_team_del (x10rt_team team, x10rt_place role, { X10RT_NET_DEBUG("team=%d, role=%d", team, role); mpi_tdb.releaseTeam(team); - ch(arg); + ch(arg, false); return; } @@ -2741,7 +2746,7 @@ void x10rt_net_team_members (x10rt_team team, x10rt_place *members, x10rt_comple MPI_Group_free(&MPI_GROUP_WORLD); MPI_Group_free(&grp); - ch(arg); + ch(arg, false); } x10rt_place x10rt_net_team_sz (x10rt_team team) @@ -2977,6 +2982,17 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { abort(); \ } \ UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; +#define MPI_AGREEMENT_COLLECTIVE(name, iname, ...) \ + CollectivePostprocess *cp = new CollectivePostprocess(); \ + struct CollectivePostprocessEnv cpe = cp->env; \ + MPI_Request &req = cp->req; \ + LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ + if (MPI_SUCCESS != MPIX_Comm_iagree(__VA_ARGS__, &req)) { \ + fprintf(stderr, "[%s:%d] %s\n", \ + __FILE__, __LINE__, "Error in MPI_" #iname); \ + abort(); \ + } \ + UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; #define MPI_COLLECTIVE_SAVE(var) \ cp->env.env.MPI_COLLECTIVE_NAME.var = var; #define MPI_COLLECTIVE_POSTPROCESS \ @@ -2988,7 +3004,7 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { #define SAVED(var) \ cpe.env.MPI_COLLECTIVE_NAME.var #define MPI_COLLECTIVE_POSTPROCESS_END X10RT_NET_DEBUG("%s: %"PRIxPTR"_%"PRIxPTR,"end postprocess", SAVED(ch), SAVED(arg)); -#elif defined(OPEN_MPI_ULFM) +#else #define MPI_COLLECTIVE(name, iname, ...) \ CollectivePostprocessEnv cpe; \ do { LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ @@ -3000,17 +3016,6 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { } \ UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; \ } while(0) -#define MPI_COLLECTIVE_SAVE(var) \ - cpe.env.MPI_COLLECTIVE_NAME.var = var; -#define MPI_COLLECTIVE_POSTPROCESS \ - cpe.ch = ch; \ - cpe.arg = arg; \ - cpe.env.MPI_COLLECTIVE_NAME.mpiError = cpe.mpiError; \ - X10RT_NET_DEBUG("call handler %s","x10rt_net_handler_" TOSTR(MPI_COLLECTIVE_NAME)); \ - CONCAT(x10rt_net_handler_,MPI_COLLECTIVE_NAME)(cpe); -#define SAVED(var) \ - cpe.env.MPI_COLLECTIVE_NAME.var -#define MPI_COLLECTIVE_POSTPROCESS_END X10RT_NET_DEBUG("calling ULFM blocking collective completed, mpi_return_code is: %d", cpe.mpiError); #define MPI_AGREEMENT_COLLECTIVE(name, iname, ...) \ CollectivePostprocessEnv cpe; \ do { LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ @@ -3022,29 +3027,20 @@ MPI_Op mpi_red_op_type(x10rt_red_type dtype, x10rt_red_op_type op) { } \ UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; \ } while(0) -#else -#define MPI_COLLECTIVE(name, iname, ...) \ - CollectivePostprocessEnv cpe; \ - do { LOCK_IF_MPI_IS_NOT_MULTITHREADED; \ - if (MPI_SUCCESS != MPI_##name(__VA_ARGS__)) { \ - fprintf(stderr, "[%s:%d] %s\n", \ - __FILE__, __LINE__, "Error in MPI_" #name); \ - abort(); \ - } \ - UNLOCK_IF_MPI_IS_NOT_MULTITHREADED; \ - } while(0) #define MPI_COLLECTIVE_SAVE(var) \ cpe.env.MPI_COLLECTIVE_NAME.var = var; #define MPI_COLLECTIVE_POSTPROCESS \ cpe.ch = ch; \ cpe.arg = arg; \ + cpe.env.MPI_COLLECTIVE_NAME.mpiError = cpe.mpiError; \ X10RT_NET_DEBUG("call handler %s","x10rt_net_handler_" TOSTR(MPI_COLLECTIVE_NAME)); \ CONCAT(x10rt_net_handler_,MPI_COLLECTIVE_NAME)(cpe); #define SAVED(var) \ cpe.env.MPI_COLLECTIVE_NAME.var -#define MPI_COLLECTIVE_POSTPROCESS_END +#define MPI_COLLECTIVE_POSTPROCESS_END X10RT_NET_DEBUG("calling blocking collective completed, mpi_return_code is: %d", cpe.mpiError); #endif + static void x10rt_net_handler_barrier(CollectivePostprocessEnv); static void x10rt_net_handler_bcast(CollectivePostprocessEnv); static void x10rt_net_handler_scatter(CollectivePostprocessEnv); @@ -3088,16 +3084,22 @@ void x10rt_net_barrier (x10rt_team team, x10rt_place role, static void x10rt_net_handler_barrier (struct CollectivePostprocessEnv cpe) { X10RT_NET_DEBUG("%s: %"PRIxPTR"_%"PRIxPTR,"begin postprocess", SAVED(ch), SAVED(arg)); X10RT_NET_DEBUG("%s","before postprocess"); - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif X10RT_NET_DEBUG("%s","after postprocess"); MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_bcast (x10rt_team team, x10rt_place role, +void x10rt_net_bcast (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, void *dbuf, size_t el, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME bcast @@ -3120,7 +3122,9 @@ bool x10rt_net_bcast (x10rt_team team, x10rt_place role, MPI_Comm comm = mpi_tdb.comm(team); + X10RT_NET_DEBUG("%s", "pre bcast"); MPI_COLLECTIVE(Bcast, Ibcast, buf, count, get_mpi_datatype(el), root, comm); + X10RT_NET_DEBUG("%s", "pro bcast"); MPI_COLLECTIVE_SAVE(team); MPI_COLLECTIVE_SAVE(role); @@ -3135,12 +3139,6 @@ bool x10rt_net_bcast (x10rt_team team, x10rt_place role, MPI_COLLECTIVE_SAVE(buf); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_bcast (struct CollectivePostprocessEnv cpe) { @@ -3153,11 +3151,11 @@ static void x10rt_net_handler_bcast (struct CollectivePostprocessEnv cpe) { } #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME @@ -3202,7 +3200,14 @@ static void x10rt_net_handler_scatter (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3244,17 +3249,23 @@ static void x10rt_net_handler_alltoall (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, +void x10rt_net_allreduce (x10rt_team team, x10rt_place role, const void *sbuf, void *dbuf, x10rt_red_op_type op, x10rt_red_type dtype, size_t count, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME allreduce @@ -3280,7 +3291,6 @@ bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, MPI_COLLECTIVE_SAVE(op); MPI_COLLECTIVE_SAVE(dtype); MPI_COLLECTIVE_SAVE(count); - MPI_COLLECTIVE_SAVE(errch); MPI_COLLECTIVE_SAVE(ch); MPI_COLLECTIVE_SAVE(arg); @@ -3288,12 +3298,6 @@ bool x10rt_net_allreduce (x10rt_team team, x10rt_place role, MPI_COLLECTIVE_SAVE(buf); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_allreduce (struct CollectivePostprocessEnv cpe) { @@ -3303,18 +3307,18 @@ static void x10rt_net_handler_allreduce (struct CollectivePostprocessEnv cpe) { } #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, - void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) +void x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, const void *soffsets, const void *scounts, + void *dbuf, size_t dcount, size_t el, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME scatterv assert(global_state.init); @@ -3353,12 +3357,6 @@ bool x10rt_net_scatterv (x10rt_team team, x10rt_place role, x10rt_place root, co MPI_COLLECTIVE_SAVE(soffsets_); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_scatterv (struct CollectivePostprocessEnv cpe) { @@ -3373,11 +3371,11 @@ static void x10rt_net_handler_scatterv (struct CollectivePostprocessEnv cpe) { #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME @@ -3421,14 +3419,20 @@ static void x10rt_net_handler_gather (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(gsize) * SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, +void x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, const void *sbuf, size_t scount, void *dbuf, const void *doffsets, const void *dcounts, size_t el, - x10rt_completion_handler *errch, x10rt_completion_handler *ch, void *arg) { #define MPI_COLLECTIVE_NAME gatherv @@ -3463,12 +3467,6 @@ bool x10rt_net_gatherv (x10rt_team team, x10rt_place role, x10rt_place root, con MPI_COLLECTIVE_SAVE(doffsets_); MPI_COLLECTIVE_POSTPROCESS - -#ifdef OPEN_MPI_ULFM - return MPI_SUCCESS == SAVED(mpiError); -#else - return true; -#endif } static void x10rt_net_handler_gatherv (struct CollectivePostprocessEnv cpe) { @@ -3483,11 +3481,11 @@ static void x10rt_net_handler_gatherv (struct CollectivePostprocessEnv cpe) { #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); #endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME @@ -3530,7 +3528,14 @@ static void x10rt_net_handler_allgather (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(gsize) * SAVED(count) * SAVED(el)); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3572,7 +3577,14 @@ void x10rt_net_allgatherv (x10rt_team team, x10rt_place role, const void *sbuf, static void x10rt_net_handler_allgatherv (struct CollectivePostprocessEnv cpe) { free(SAVED(dcounts_)); free(SAVED(doffsets_)); - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3628,12 +3640,19 @@ static void x10rt_net_handler_alltoallv (struct CollectivePostprocessEnv cpe) { free(SAVED(soffsets_)); free(SAVED(dcounts_)); free(SAVED(doffsets_)); - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } -bool x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, x10rt_completion_handler *errch, +void x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *dbuf, x10rt_completion_handler *ch, void *arg) { #ifdef OPEN_MPI_ULFM @@ -3653,19 +3672,15 @@ bool x10rt_net_agree (x10rt_team team, x10rt_place role, const int *sbuf, int *d MPI_COLLECTIVE_SAVE(arg); MPI_COLLECTIVE_POSTPROCESS - - return MPI_SUCCESS == SAVED(mpiError); -#else - return false; #endif } static void x10rt_net_handler_agree(CollectivePostprocessEnv cpe) { #ifdef OPEN_MPI_ULFM if (is_process_failure_error(cpe.mpiError)) - SAVED(errch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), true); else - SAVED(ch)(SAVED(arg)); + SAVED(ch)(SAVED(arg), false); MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME #endif @@ -3737,7 +3752,14 @@ static void x10rt_net_handler_reduce (struct CollectivePostprocessEnv cpe) { memcpy(SAVED(dbuf), SAVED(buf), SAVED(count) * sizeof_dtype(SAVED(dtype))); free(SAVED(buf)); } - SAVED(ch)(SAVED(arg)); +#ifdef OPEN_MPI_ULFM + if (is_process_failure_error(cpe.mpiError)) + SAVED(ch)(SAVED(arg), true); + else + SAVED(ch)(SAVED(arg), false); +#else + SAVED(ch)(SAVED(arg), false); +#endif MPI_COLLECTIVE_POSTPROCESS_END #undef MPI_COLLECTIVE_NAME } @@ -3935,6 +3957,7 @@ void mpiErrorHandler(MPI_Comm * comm, int *errorCode, ...){ if (placeRemovedCB != NULL && newDeadCount > oldDeadCount) { for (int i = oldDeadCount; i < newDeadCount; ++i) { placeRemovedCB(global_state.deadPlaces[i]); + X10RT_NET_DEBUG("Place(%d): MPI found dead Place(%d)", x10rt_net_here(), global_state.deadPlaces[i]); } } MPI_Group_free(&failedGroup); From ad15def8669f53eebe900d39b0766684e269c02a Mon Sep 17 00:00:00 2001 From: u5482878 Date: Thu, 19 Apr 2018 19:20:15 +1000 Subject: [PATCH 4/6] Relying on finish for failure reporting as it was done before ULFM's first integration. Also changes to the way native collectives are called. Now all native collectives return void and use one call-back function for reporting the collective completion status. --- x10.runtime/src-x10/x10/util/Team.x10 | 89 +++++++++++++-------------- 1 file changed, 42 insertions(+), 47 deletions(-) diff --git a/x10.runtime/src-x10/x10/util/Team.x10 b/x10.runtime/src-x10/x10/util/Team.x10 index 1dbd05bd1c..f49c4caacc 100644 --- a/x10.runtime/src-x10/x10/util/Team.x10 +++ b/x10.runtime/src-x10/x10/util/Team.x10 @@ -7,6 +7,7 @@ * http://www.opensource.org/licenses/eclipse-1.0.php * * (C) Copyright IBM Corporation 2006-2016. + * (C) Copyright Sara Salem Hamouda 2018. */ package x10.util; @@ -213,9 +214,7 @@ public struct Team { finish nativeAgree(id, id==0n?here.id() as Int:Team.roles(id), src, dst); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeAgree(id, id==0n?here.id() as Int:Team.roles(id), src, dst); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeAgree(id, id==0n?here.id() as Int:Team.roles(id), src, dst); } } else { throw new UnsupportedOperationException("Emulated agreement not supported"); @@ -223,9 +222,10 @@ public struct Team { return dst(0); } - //TODO: support Java - @Native("c++", "x10rt_agree(#id, #role, #src->raw, #dst->raw, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeAgree (id:Int, role:Int, src:Rail[Int], dst:Rail[Int]) :Boolean = false; + private static def nativeAgree (id:Int, role:Int, src:Rail[Int], dst:Rail[Int]) : void { + //FIXME: support Java + @Native("c++", "x10rt_agree(id, role, src->raw, dst->raw, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } /** Blocks until all members have received their part of root's array. * Each member receives a contiguous and distinct portion of the src array. @@ -253,7 +253,7 @@ public struct Team { finish nativeScatter(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeScatter(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); + finish nativeScatter(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); } else state(id).collective_impl[T](LocalTeamState.COLL_SCATTER, root, src, src_off, dst, dst_off, count, 0n, null, null); @@ -301,22 +301,20 @@ public struct Team { finish nativeScatterv(id, my_role, root.id() as Int, src, src_off as Int, scounts, soffsets, dst, dst_off as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeScatterv(id, my_role, root.id() as Int, src, src_off as Int, scounts, soffsets, dst, dst_off as Int); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeScatterv(id, my_role, root.id() as Int, src, src_off as Int, scounts, soffsets, dst, dst_off as Int); } else{ state(id).collective_impl[T](LocalTeamState.COLL_SCATTERV, root, src, src_off, dst, dst_off, 0n, 0n, soffsets, scounts); } } - //TODO: not supported for Java - //@Native("java", "x10.x10rt.TeamSupport.nativeScatterv(id, role, root, ...);") - @Native("c++", "x10rt_scatterv(#id, #role, #root, #src->raw, #soffsets->raw, #scounts->raw, &(#dst)->raw[#dst_off], #scounts->raw[#role], sizeof(TPMGL(T)), ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeScatterv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, scounts:Rail[Int], soffsets:Rail[Int], dst:Rail[T], dst_off:Int):Boolean = false; - + private static def nativeScatterv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, scounts:Rail[Int], soffsets:Rail[Int], dst:Rail[T], dst_off:Int) : void { + //FIXME: support Java + //@Native("java", "x10.x10rt.TeamSupport.nativeScatterv(id, role, root, ...);") + @Native("c++", "x10rt_scatterv(id, role, root, src->raw, soffsets->raw, scounts->raw, &dst->raw[dst_off], scounts->raw[role], sizeof(TPMGL(T)), ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } - //TODO: not supported for Java or PAMI + //FIXME: support Java and PAMI public def gather[T] (root:Place, src:Rail[T], src_off:Long, dst:Rail[T], dst_off:Long, count:Long) : void { if (CompilerFlags.checkBounds() && here == root) checkBounds(dst_off + (size() * count) -1, dst.size); checkBounds(src_off+count-1, src.size); @@ -324,13 +322,13 @@ public struct Team { finish nativeGather(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeGather(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); + finish nativeGather(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); } else state(id).collective_impl[T](LocalTeamState.COLL_GATHER, root, src, src_off, dst, dst_off, count, 0n, null, null); } - //TODO: not supported for Java or PAMI + //FIXME: support Java and PAMI private static def nativeGather[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int) : void { //@Native("java", "x10.x10rt.TeamSupport.nativeGather(id, role, root, src, src_off, dst, dst_off, count);") @Native("c++", "x10rt_gather(id, role, root, &src->raw[src_off], &dst->raw[dst_off], sizeof(TPMGL(T)), count, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} @@ -368,20 +366,19 @@ public struct Team { finish nativeGatherv(id, my_role, root.id() as Int, src, src_off as Int, dst, dst_off as Int, dcounts, doffsets); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeGatherv(id, my_role, root.id() as Int, src, src_off as Int, dst, dst_off as Int, dcounts, doffsets); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeGatherv(id, my_role, root.id() as Int, src, src_off as Int, dst, dst_off as Int, dcounts, doffsets); } else{ state(id).collective_impl[T](LocalTeamState.COLL_GATHERV, root, src, src_off, dst, dst_off, 0n, 0n, doffsets, dcounts); } } - //TODO: not supported for Java - //@Native("java", "x10.x10rt.TeamSupport.nativeGatherv(id, role, root, ...);") - @Native("c++", "x10rt_gatherv(#id, #role, #root, &(#src)->raw[#src_off], #dcounts->raw[#role], #dst->raw, #doffsets->raw, #dcounts->raw, sizeof(TPMGL(T)), ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeGatherv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, dcounts:Rail[Int], doffsets:Rail[Int]) : Boolean = false; - + private static def nativeGatherv[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, dcounts:Rail[Int], doffsets:Rail[Int]) : void { + //FIXME: support Java + //@Native("java", "x10.x10rt.TeamSupport.nativeGatherv(id, role, root, ...);") + @Native("c++", "x10rt_gatherv(id, role, root, &src->raw[src_off], dcounts->raw[role], dst->raw, doffsets->raw, dcounts->raw, sizeof(TPMGL(T)), ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } + /** Blocks until all members have received root's array. * * @param root The member who is supplying the data @@ -403,17 +400,16 @@ public struct Team { finish nativeBcast(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - val success = nativeBcast(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeBcast(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int); } else state(id).collective_impl[T](LocalTeamState.COLL_BROADCAST, root, src, src_off, dst, dst_off, count, 0n, null, null); } - @Native("java", "x10.x10rt.TeamSupport.nativeBcast(#id, #role, #root, #src, #src_off, #dst, #dst_off, #count)") - @Native("c++", "x10rt_bcast(#id, #role, #root, &(#src)->raw[#src_off], &(#dst)->raw[#dst_off], sizeof(TPMGL(T)), #count, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter())") - private static def nativeBcast[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int) : Boolean = false; + private static def nativeBcast[T] (id:Int, role:Int, root:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int) : void { + @Native("java", "x10.x10rt.TeamSupport.nativeBcast(id, role, root, src, src_off, dst, dst_off, count);") + @Native("c++", "x10rt_bcast(id, role, root, &src->raw[src_off], &dst->raw[dst_off], sizeof(TPMGL(T)), count, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } /** Blocks until all members have received their part of each other member's array. * Each member receives a contiguous and distinct portion of the src array. @@ -445,7 +441,7 @@ public struct Team { if (DEBUG) Runtime.println(here + " entering pre-alltoall barrier of team "+id); barrierIgnoreExceptions(); if (DEBUG) Runtime.println(here + " entering native alltoall of team "+id); - nativeAlltoall(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int); + finish nativeAlltoall(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int); } // XTENLANG-3434 X10 alltoall is broken /* @@ -554,7 +550,7 @@ public struct Team { if (DEBUG) Runtime.println(here + " entering pre-reduce barrier on team "+id); barrierIgnoreExceptions(); if (DEBUG) Runtime.println(here + " entering native reduce on team "+id); - nativeReduce(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int, op); + finish nativeReduce(id, id==0n?here.id() as Int:Team.roles(id), root.id() as Int, src, src_off as Int, dst, dst_off as Int, count as Int, op); if (DEBUG) Runtime.println(here + " Finished native reduce on team "+id); } else { state(id).collective_impl[T](LocalTeamState.COLL_REDUCE, root, src, src_off, dst, dst_off, count, op, null, null); @@ -728,9 +724,7 @@ public struct Team { if (DEBUG) Runtime.println(here + " entering pre-allreduce barrier on team "+id); barrierIgnoreExceptions(); if (DEBUG) Runtime.println(here + " entering native allreduce on team "+id); - val success = nativeAllreduce(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int, op); - if (!success) - throw new DeadPlaceException("[Native] Team "+id+" contains at least one dead member"); + finish nativeAllreduce(id, id==0n?here.id() as Int:Team.roles(id), src, src_off as Int, dst, dst_off as Int, count as Int, op); } else { if (DEBUG) Runtime.println(here + " entering Team.x10 allreduce on team "+id); state(id).collective_impl[T](LocalTeamState.COLL_ALLREDUCE, state(id).places(0), src, src_off, dst, dst_off, count, op, null, null); @@ -738,9 +732,10 @@ public struct Team { if (DEBUG) Runtime.println(here + " Finished allreduce on team "+id); } - @Native("java", "x10.x10rt.TeamSupport.nativeAllReduce(#id, #role, #src, #src_off, #dst, #dst_off, #count, #op)") - @Native("c++", "x10rt_allreduce(#id, #role, &(#src)->raw[#src_off], &(#dst)->raw[#dst_off], (x10rt_red_op_type)(#op), x10rt_get_red_type(), #count, ::x10aux::failed_coll_handler, ::x10aux::coll_handler,::x10aux::coll_enter())") - private static def nativeAllreduce[T](id:Int, role:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int, op:Int):Boolean = false; + private static def nativeAllreduce[T](id:Int, role:Int, src:Rail[T], src_off:Int, dst:Rail[T], dst_off:Int, count:Int, op:Int) : void { + @Native("java", "x10.x10rt.TeamSupport.nativeAllReduce(id, role, src, src_off, dst, dst_off, count, op);") + @Native("c++", "x10rt_allreduce(id, role, &src->raw[src_off], &dst->raw[dst_off], (x10rt_red_op_type)op, x10rt_get_red_type(), count, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + } /** Performs a reduction on a single value, returning the result */ public def allreduce (src:Boolean, op:Int):Boolean { @@ -829,7 +824,7 @@ public struct Team { private static def nativeAllreduce[T](id:Int, role:Int, src:Rail[T], dst:Rail[T], op:Int) : void { @Native("java", "x10.x10rt.TeamSupport.nativeAllReduce(id, role, src, 0, dst, 0, 1, op);") - @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, (x10rt_red_op_type)op, x10rt_get_red_type(), 1, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, (x10rt_red_op_type)op, x10rt_get_red_type(), 1, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} } /** This operation blocks until all members have received the computed result. @@ -847,7 +842,7 @@ public struct Team { finish nativeIndexOfMax(id, id==0n?here.id() as Int:Team.roles(id), src, dst); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeIndexOfMax(id, id==0n?here.id() as Int:Team.roles(id), src, dst); + finish nativeIndexOfMax(id, id==0n?here.id() as Int:Team.roles(id), src, dst); } else state(id).collective_impl[DoubleIdx](LocalTeamState.COLL_INDEXOFMAX, state(id).places(0), src, 0, dst, 0, 1, 0n, null, null); @@ -856,7 +851,7 @@ public struct Team { private static def nativeIndexOfMax(id:Int, role:Int, src:Rail[DoubleIdx], dst:Rail[DoubleIdx]) : void { @Native("java", "x10.x10rt.TeamSupport.nativeIndexOfMax(id, role, src, dst);") - @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MAX, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MAX, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} } /** This operation blocks until all members have received the computed result. @@ -874,7 +869,7 @@ public struct Team { finish nativeIndexOfMin(id, id==0n?here.id() as Int:Team.roles(id), src, dst); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeIndexOfMin(id, id==0n?here.id() as Int:Team.roles(id), src, dst); + finish nativeIndexOfMin(id, id==0n?here.id() as Int:Team.roles(id), src, dst); } else state(id).collective_impl[DoubleIdx](LocalTeamState.COLL_INDEXOFMIN, state(id).places(0), src, 0, dst, 0, 1, 0n, null, null); @@ -883,7 +878,7 @@ public struct Team { private static def nativeIndexOfMin(id:Int, role:Int, src:Rail[DoubleIdx], dst:Rail[DoubleIdx]) : void { @Native("java", "x10.x10rt.TeamSupport.nativeIndexOfMin(id, role, src, dst);") - @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MIN, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::failed_coll_handler, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} + @Native("c++", "x10rt_allreduce(id, role, src->raw, dst->raw, X10RT_RED_OP_MIN, X10RT_RED_TYPE_DBL_S32, 1, ::x10aux::coll_handler, ::x10aux::coll_enter());") {} } /** Create new teams by subdividing an existing team. This is called by each member @@ -943,7 +938,7 @@ public struct Team { if (DEBUGINTERNALS) Runtime.println(here + " calling pre-native split barrier on team "+id+" color="+color+" new_role="+new_role); barrierIgnoreExceptions(); if (DEBUGINTERNALS) Runtime.println(here + " calling native split on team "+id+" color="+color+" new_role="+new_role); - nativeSplit(id, id==0n?here.id() as Int:Team.roles(id), color, new_role as Int, result); + finish nativeSplit(id, id==0n?here.id() as Int:Team.roles(id), color, new_role as Int, result); if (DEBUG) Runtime.println(here + " finished native split on team "+id+" color="+color+" new_role="+new_role); return Team(result(0), newTeamPlaceGroup, new_role); } @@ -968,7 +963,7 @@ public struct Team { finish nativeDel(id, id==0n?here.id() as Int:Team.roles(id)); else if (collectiveSupportLevel == X10RT_COLL_ALLBLOCKINGCOLLECTIVES || collectiveSupportLevel == X10RT_COLL_NONBLOCKINGBARRIER) { barrierIgnoreExceptions(); - nativeDel(id, id==0n?here.id() as Int:Team.roles(id)); + finish nativeDel(id, id==0n?here.id() as Int:Team.roles(id)); } // TODO - see if there is something useful to delete with the local team implementation } From f9c080ad5175fa832d6bc21ec6c0498c0fbfc4de Mon Sep 17 00:00:00 2001 From: u5482878 Date: Thu, 19 Apr 2018 19:20:50 +1000 Subject: [PATCH 5/6] fixing a typo in a comment --- x10.tests/tests/MicroBenchmarks/BenchmarkScatterV.x10 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x10.tests/tests/MicroBenchmarks/BenchmarkScatterV.x10 b/x10.tests/tests/MicroBenchmarks/BenchmarkScatterV.x10 index 59fcc1c493..7933ed4442 100644 --- a/x10.tests/tests/MicroBenchmarks/BenchmarkScatterV.x10 +++ b/x10.tests/tests/MicroBenchmarks/BenchmarkScatterV.x10 @@ -30,7 +30,7 @@ public class BenchmarkScatterV extends x10Test { val warmupIn = new Rail[Double](NPLACES); var warmupOut:Rail[Double] = new Rail[Double](1); val warmupCounts = new Rail[Int](NPLACES, 1n); - Team.WORLD.scatterv(root,warmupIn, 0, warmupOut, 0, warmupCounts); // warm up comms layer + Team.WORLD.scatterv(root, warmupIn, 0, warmupOut, 0, warmupCounts); // warm up comms layer for (var s:Long= 1; s <= MAX_S; s++) { var src:Rail[Double] = null; @@ -44,7 +44,7 @@ public class BenchmarkScatterV extends x10Test { src = new Rail[Double](srcSize); var lastPlaceId:Long = 0; var it:Long = SRC_OFFSET; - //initialize the src rail so that each place segment contains its the place id as a value + //initialize the src rail so that each place segment contains its place id as a value while(it Date: Thu, 19 Apr 2018 19:21:34 +1000 Subject: [PATCH 6/6] A micro-benchmark for evaluating Team.agree which maps to ULFM's MPI_COMM_AGREE --- .../tests/MicroBenchmarks/BenchmarkAgree.x10 | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 x10.tests/tests/MicroBenchmarks/BenchmarkAgree.x10 diff --git a/x10.tests/tests/MicroBenchmarks/BenchmarkAgree.x10 b/x10.tests/tests/MicroBenchmarks/BenchmarkAgree.x10 new file mode 100644 index 0000000000..c2fc0941ce --- /dev/null +++ b/x10.tests/tests/MicroBenchmarks/BenchmarkAgree.x10 @@ -0,0 +1,42 @@ +/* + * This file is part of the X10 project (http://x10-lang.org). + * + * This file is licensed to You under the Eclipse Public License (EPL); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.opensource.org/licenses/eclipse-1.0.php + * + * (C) Copyright IBM Corporation 2014-2016. + * (C) Copyright Sara Salem Hamouda 2018. + */ +import harness.x10Test; + +import x10.util.Team; + +/** + * Benchmarks performance of Team.agree + */ +public class BenchmarkAgree extends x10Test { + private static ITERS = 1000; + + public def run(): Boolean { + finish for (place in Place.places()) at (place) async { + Team.WORLD.agree(0n); // warm up comms layer + val start = System.nanoTime(); + for (iter in 1..ITERS) { + val out = Team.WORLD.agree(iter as Int); + // check correctness + chk((iter as Int) == out, here + " agreement not reached expected["+iter+"] found["+out+"]"); + } + val stop = System.nanoTime(); + + if (here == Place.FIRST_PLACE) Console.OUT.printf("agree: %g ms\n", ((stop-start) as Double) / 1e6 / ITERS); + } + + return true; + } + + public static def main(var args: Rail[String]): void { + new BenchmarkAgree().execute(); + } +}