Skip to content

Commit

Permalink
ISC Demo
Browse files Browse the repository at this point in the history
* Contains a utility c0isc_reg to load a library across all
  running instances of Mero that host ISC service.
* Introduces use-cases for ISC. These use-cases can be
  ivoked using c0isc_demo
* README.isc discusses the details.
  • Loading branch information
nsahasra-seagate authored and andriytk committed Jul 8, 2020
1 parent 4b1c696 commit 2f6b924
Show file tree
Hide file tree
Showing 9 changed files with 1,170 additions and 1 deletion.
21 changes: 20 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ EXE3 = c0rm
EXE4 = fgen
EXE5 = c0cp_async

#isc executables and library
LIBISC = libdemo.so
ISC_REG = c0isc_reg
ISC_INVK = c0isc_demo

#archieve/node names
TARF = m0trace_$(shell date +%Y%m%d-%H%M%S).tar.bz2
TARN = $(shell ls -la m0trace.* &> /dev/null | wc -l)
Expand Down Expand Up @@ -142,10 +147,14 @@ vmrcf:
mkdir -p .$(EXE2)rc
mkdir -p .$(EXE3)rc
mkdir -p .$(EXE5)rc
mkdir -p .$(ISC_REG)rc
mkdir -p .$(ISC_INVK)rc
./scripts/c0appzrcgen > ./.$(EXE1)rc/$(NODE)
./scripts/c0appzrcgen > ./.$(EXE2)rc/$(NODE)
./scripts/c0appzrcgen > ./.$(EXE3)rc/$(NODE)
./scripts/c0appzrcgen > ./.$(EXE5)rc/$(NODE)
./scripts/c0appzrcgen > ./.$(ISC_REG)rc/$(NODE)
./scripts/c0appzrcgen > ./.$(ISC_INVK)rc/$(NODE)

sagercf:
mkdir -p .${EXE1}rc
Expand Down Expand Up @@ -203,7 +212,17 @@ mpi-sagercf:
mkdir -p .${EXE6}rc
sage-user-application-assignment ganesan $(EXE6) 172.18.1.${c} > .$(EXE6)rc/client-${c}


$(ISC_REG):
gcc c0appz.c c0isc_register.c -I/usr/include/mero -g $(CFLAGS) $(LFLAGS) -o $(ISC_REG)
$(LIBISC):
gcc isc_libdemo.c -I/usr/include/mero $(CFLAGS) -fpic -shared -o $(LIBISC)
$(ISC_INVK):
gcc c0appz.c c0isc_demo.c -I/usr/include/mero -g $(CFLAGS) $(LFLAGS) -o $(ISC_INVK)
isc-all: $(ISC_REG) $(ISC_INVK) $(LIBISC)
isc-clean:
rm -f $(ISC_REG) $(ISC_INVK)
isc-lib-clean:
rm -f $(LIBISC)
#
#ECMWF Appz
#
Expand Down
86 changes: 86 additions & 0 deletions README.isc
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@


Preparing a library
===================

APIs from an external library can't be linked directly with a mero instance.
A library is supposed to have a function named "void mero_lib_init(void)".
All APIs from a library shall confine to the following signature:

int comp(struct m0_buf *args, struct m0_buf *out,
struct m0_isc_comp_private *comp_data, int *rc)

See isc_lib_demo.c for a usecase.

Loading a library
=================

Using a spiel command (@see "spiel/spiel.h" and "c0appz_isc.h") a library can
be loaded with any running mero instance. A helper function
"c0appz_isc_api_register" (c0appz_isc.h) takes a path of the library, which is
expected to be identical across all the nodes hosting a mero instance. Currently
a utility c0isc_reg takes the path as an input and loads the library with remote
mero instances.

On successful loading of the library the output looks like:

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
[devvm@sherlock clovis-sample-apps]$ ./c0isc_reg $PWD/libdemo.so
.c0isc_regrc/sherlock
[ cput: 1.11 s 0.00 Mbs ][ wclk: 7.20 s 0.00 Mbs ]
c0isc_reg success
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Demo computations
=================

Currently we demonstrate three simple computations. The utility c0isc_demo can
be used to invoke any of the computations supported. Following are the steps
to view the demo output.
1. make isc-clean
2. make isc-lib-clean
3. make isc-all

The second step builds a library libdemo.so

ping
----
This functionality pings all ISC services present in Mero configuration and
returns "Hello-World@<service-fid>" till it encounters a service that's down.

min / max
--------

This functionality reads a file (c0isc_data) co-located with the utility and
identifies the min or max depending upon the input option provided. On
successful completion it shows following output format:

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
[devvm@sherlock clovis-sample-apps]$ ./c0isc_demo min
.c0isc_demorc/sherlock

idx=13val=-14234.430000
[ cput: 1.07 s 0.00 Mbs ][ wclk: 4.12 s 0.00 Mbs ]
c0isc_demo success
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

It assumes the presence of file"c0isc_data" and it prints the index
of the optimal (min or max) value and its index (indexing starting with 0).
c0isc_demo.c contains a code which can be extended to develop any application.

What's not present ?
====================

Data serialisation is necessary for communicating with remote service. Mero
serialises using gccxml and an internal utility gccxml2xcode. Incorporating
this utility in Mero rpm is currently scheduled and not part of current
Mero master. The demo includes a simplistic serialisation technique and
is not scalable as is.

Async communication with remote services is possible but the demo does not
include the same. For that kindly review cases from iscservice/ut/service_ut.c

Currently conducting an IO from server-side is not easily doable. It requires all
the operations that ioservice conducts needs to be done. A simplified API
for server side IO is under progress. Hence result of the computation can not
be persisted as of now.
230 changes: 230 additions & 0 deletions c0appz.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,21 @@
#include "c0appz.h"
#include "c0params.h"
#include "clovis/clovis.h"
#include "clovis/clovis_internal.h"
#include "clovis/clovis_idx.h"
#include "conf/confc.h" /* m0_confc_open_sync */
#include "conf/dir.h" /* m0_conf_dir_len */
#include "conf/helpers.h" /* m0_confc_root_open */
#include "conf/diter.h" /* m0_conf_diter_next_sync */
#include "conf/obj_ops.h" /* M0_CONF_DIREND */
#include "spiel/spiel.h" /* m0_spiel_process_lib_load */
#include "reqh/reqh.h" /* m0_reqh */
#include "lib/types.h" /* uint32_t */

#include "iscservice/isc.h"
#include "lib/buf.h"
#include "rpc/rpclib.h"
#include "c0appz_isc.h"

#ifndef DEBUG
#define DEBUG 0
Expand All @@ -54,6 +68,7 @@ static struct m0_clovis_container clovis_container;
static struct m0_clovis_realm clovis_uber_realm;
static struct m0_clovis_config clovis_conf;
static struct m0_idx_dix_config dix_conf;
static struct m0_spiel spiel_inst;

static char c0rc[8][SZC0RCSTR];
static char c0rcfile[SZC0RCFILE] = C0RCFLE;
Expand Down Expand Up @@ -522,6 +537,221 @@ int c0appz_rm(uint64_t idhi,uint64_t idlo)
return rc;
}

int c0appz_rmach_bulk_cutoff(struct m0_rpc_link *link, uint32_t *bulk_cutoff)
{
if (link == NULL || bulk_cutoff == NULL)
return -EINVAL;
*bulk_cutoff = link->rlk_conn.c_rpc_machine->rm_bulk_cutoff;
return 0;
}

static int c0appz_spiel_prepare(struct m0_spiel *spiel)
{
struct m0_reqh *reqh;
int rc;

reqh = &clovis_instance->m0c_reqh;
rc = m0_spiel_init(spiel, reqh);
if (rc != 0) {
fprintf(stderr, "error! spiel initialisation failed.\n");
return rc;
}

rc = m0_spiel_cmd_profile_set(spiel, clovis_conf.cc_profile);
if (rc != 0) {
fprintf(stderr, "error! spiel initialisation failed.\n");
return rc;
}
rc = m0_spiel_rconfc_start(spiel, NULL);
if (rc != 0) {
fprintf(stderr, "error! starting of rconfc failed in spiel failed.\n");
return rc;
}

return 0;
}

static void c0appz_spiel_destroy(struct m0_spiel *spiel)
{
m0_spiel_rconfc_stop(spiel);
m0_spiel_fini(spiel);
}

static bool conf_obj_is_proc(const struct m0_conf_obj *obj)
{
return m0_conf_obj_type(obj) == &M0_CONF_PROCESS_TYPE;
}

struct m0_rpc_link * c0appz_isc_rpc_link_get(struct m0_fid *svc_fid)
{
struct m0_reqh *reqh = &clovis_instance->m0c_reqh;
struct m0_reqh_service_ctx *ctx;
struct m0_pools_common *pc = reqh->rh_pools;

m0_tl_for(pools_common_svc_ctx, &pc->pc_svc_ctxs, ctx) {
if (ctx->sc_type == M0_CST_ISCS &&
m0_fid_eq(&ctx->sc_fid, svc_fid))
return &ctx->sc_rlink;
} m0_tl_endfor;
return NULL;
}
/*
* Loads a library into m0d instances.
*/
int c0appz_isc_api_register(const char *libpath)
{
int rc;
struct m0_reqh *reqh;
struct m0_confc *confc;
struct m0_conf_root *root;
struct m0_conf_process *proc;
struct m0_conf_obj *obj;
struct m0_conf_diter it;

rc = c0appz_spiel_prepare(&spiel_inst);
if (rc != 0) {
fprintf(stderr, "error! spiel initialization failed");
return rc;
}
reqh = &clovis_instance->m0c_reqh;
confc = m0_reqh2confc(reqh);
rc = m0_confc_root_open(confc, &root);
if (rc != 0) {
c0appz_spiel_destroy(&spiel_inst);
return rc;
}

rc = m0_conf_diter_init(&it, confc,
&root->rt_obj,
M0_CONF_ROOT_NODES_FID,
M0_CONF_NODE_PROCESSES_FID);
if (rc != 0) {
m0_confc_close(&root->rt_obj);
c0appz_spiel_destroy(&spiel_inst);
return rc;
}
while ((rc = m0_conf_diter_next_sync(&it, conf_obj_is_proc)) !=
M0_CONF_DIRNEXT)
; /* Skip over non-proc objects. */
for (obj = m0_conf_diter_result(&it); rc == M0_CONF_DIRNEXT;
rc = m0_conf_diter_next_sync(&it, conf_obj_is_proc)) {
obj = m0_conf_diter_result(&it);
proc = M0_CONF_CAST(obj, m0_conf_process);
rc = m0_spiel_process_lib_load(&spiel_inst,
&proc->pc_obj.co_id,
libpath);
if (rc != 0) {
fprintf(stderr, "error! loading the library %s"
"failed for process\n", libpath);
m0_conf_diter_fini(&it);
m0_confc_close(&root->rt_obj);
c0appz_spiel_destroy(&spiel_inst);
return rc;
}
}
m0_conf_diter_fini(&it);
m0_confc_close(&root->rt_obj);
c0appz_spiel_destroy(&spiel_inst);
return 0;
}

int c0appz_isc_nxt_svc_get(struct m0_fid *svc_fid, struct m0_fid *nxt_fid,
enum m0_conf_service_type s_type)
{
struct m0_reqh *reqh = &clovis_instance->m0c_reqh;
struct m0_reqh_service_ctx *ctx;
struct m0_pools_common *pc = reqh->rh_pools;
struct m0_fid null_fid = M0_FID0;
struct m0_fid current_fid = *svc_fid;

m0_tl_for(pools_common_svc_ctx, &pc->pc_svc_ctxs, ctx) {
if (ctx->sc_type == s_type && m0_fid_eq(&current_fid,
&null_fid)) {
*nxt_fid = ctx->sc_fid;
return 0;
} else if (ctx->sc_type == s_type &&
m0_fid_eq(svc_fid, &ctx->sc_fid))
current_fid = null_fid;
} m0_tl_endfor;
*nxt_fid = M0_FID0;
return -ENOENT;
}

int c0appz_isc_req_prepare(struct c0appz_isc_req *req, struct m0_buf *args,
const struct m0_fid *comp_fid,
struct m0_buf *reply_buf, struct m0_fid *svc_fid,
uint32_t reply_len)
{

struct m0_fop_isc *fop_isc = &req->cir_isc_fop;
struct m0_fop *arg_fop = &req->cir_fop;
struct m0_rpc_link *link;
int rc;

req->cir_args = args;
req->cir_result = reply_buf;
fop_isc->fi_comp_id = *comp_fid;
req->cir_rpc_link = c0appz_isc_rpc_link_get(svc_fid);
if (req->cir_rpc_link == NULL) {
fprintf(stderr, "error! isc request can not be prepared for"
"process "FID_F, FID_P(svc_fid));
return -EINVAL;
}
link = req->cir_rpc_link;
m0_rpc_at_init(&fop_isc->fi_args);
rc = m0_rpc_at_add(&fop_isc->fi_args, args, &link->rlk_conn);
if (rc != 0) {
m0_rpc_at_fini(&fop_isc->fi_args);
fprintf(stderr, "error! m0_rpc_at_add() failed with %d\n", rc);
return rc;
}
/* Initialise the reply RPC AT buffer to be received.*/
m0_rpc_at_init(&fop_isc->fi_ret);
rc = m0_rpc_at_recv(&fop_isc->fi_ret, &link->rlk_conn,
reply_len, false);
if (rc != 0) {
m0_rpc_at_fini(&fop_isc->fi_args);
m0_rpc_at_fini(&fop_isc->fi_ret);
fprintf(stderr, "error! m0_rpc_at_recv() failed with %d\n", rc);
return rc;
}
m0_fop_init(arg_fop, &m0_fop_isc_fopt, fop_isc, m0_fop_release);
return rc;
}

int c0appz_isc_req_send_sync(struct c0appz_isc_req *req)
{
struct m0_fop *reply_fop;
struct m0_fop_isc_rep isc_reply;
int rc;

rc = m0_rpc_post_sync(&req->cir_fop, &req->cir_rpc_link->rlk_sess, NULL,
M0_TIME_IMMEDIATELY);
if (rc != 0) {
fprintf(stderr, "error! request could not be sent");
return rc;
}
reply_fop = m0_rpc_item_to_fop(req->cir_fop.f_item.ri_reply);
isc_reply = *(struct m0_fop_isc_rep *)m0_fop_data(reply_fop);
req->cir_rc = isc_reply.fir_rc;
rc = m0_rpc_at_rep_get(&req->cir_isc_fop.fi_ret, &isc_reply.fir_ret,
req->cir_result);
if (rc != 0)
fprintf(stderr, "\nerror! m0_rpc_at_get returned %d\n", rc);
return req->cir_rc == 0 ? rc : req->cir_rc;
}

void c0appz_isc_req_fini(struct c0appz_isc_req *req)
{
struct m0_fop *reply_fop;

reply_fop = m0_rpc_item_to_fop(req->cir_fop.f_item.ri_reply);
if (reply_fop != NULL)
m0_fop_put_lock(reply_fop);
m0_rpc_at_fini(&req->cir_isc_fop.fi_args);
m0_rpc_at_fini(&req->cir_isc_fop.fi_ret);
}

/*
* c0appz_ex()
* object exists test.
Expand Down
Loading

0 comments on commit 2f6b924

Please sign in to comment.