Skip to content

Commit

Permalink
TODO: c0appz_mw_async() created, write_data_to_object_async bug fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
nezzzu committed Jul 20, 2020
1 parent 9ba1069 commit a8bb9cc
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 24 deletions.
153 changes: 129 additions & 24 deletions c0appz.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ int c0appz_cp(uint64_t idhi, uint64_t idlo, char *filename,

/*
* c0appz_cp_async()
* copy to and object in an async manner
* copy to an object from a file in an asynchronous manner
*/
int c0appz_cp_async(uint64_t idhi, uint64_t idlo, char *src, uint64_t bsz,
uint64_t cnt, uint64_t op_cnt)
Expand Down Expand Up @@ -344,6 +344,109 @@ int c0appz_cp_async(uint64_t idhi, uint64_t idlo, char *src, uint64_t bsz,
return rc;
}

/*
* c0appz_mw_async()
* copy to an object from a memory buffer in an asynchronous manner
* TODO: change copy from file to copy from buffer
*/
int c0appz_mw_async(uint64_t idhi, uint64_t idlo, char *src, uint64_t bsz,
uint64_t cnt, uint64_t op_cnt)
{
int i;
int rc = 0;
int nr_ops_sent;
uint64_t last_index;
struct m0_uint128 id;
struct clovis_aio_op *aio;
struct clovis_aio_opgrp aio_grp;
FILE *fp;

assert(cnt % op_cnt == 0);

/* open file */
fp = fopen(src, "rb");
if (fp == NULL) {
fprintf(stderr,"error! could not open output file %s\n", src);
return 1;
}

/* ids */
id.u_hi = idhi;
id.u_lo = idlo;

last_index = 0;
while (cnt > 0) {
/* Initialise operation group. */
rc = clovis_aio_opgrp_init(&aio_grp, op_cnt, op_cnt);
if (rc != 0) {
fclose(fp);
return rc;
}

/* Open the object. */
m0_clovis_obj_init(&aio_grp.cag_obj, &clovis_uber_realm,
&id, M0_DEFAULT_LAYOUT_ID);
open_entity(&aio_grp.cag_obj.ob_entity);

/* Set each op. */
nr_ops_sent = 0;
for (i = 0; i < op_cnt; i++) {
aio = &aio_grp.cag_aio_ops[i];
rc = clovis_aio_vec_alloc(aio, 1, bsz);
if (rc != 0)
break;

aio->cao_ext.iv_index[0] = last_index;
aio->cao_ext.iv_vec.v_count[0] = bsz;
last_index += bsz;

aio->cao_attr.ov_vec.v_count[0] = 0;

/* Read data from source file. */
rc = read_data_from_file(fp, &aio->cao_data, bsz);
M0_ASSERT(rc == 1);

/* Launch IO op. */
rc = write_data_to_object_async(aio);
if (rc != 0) {
fprintf(stderr, "Writing to object failed!\n");
break;
}
nr_ops_sent++;
}

/* Wait for all ops to complete. */
for (i = 0; i < nr_ops_sent; i++)
m0_semaphore_down(&aio_grp.cag_sem);

/* Finalise ops and group. */
rc = rc ?: aio_grp.cag_rc;
for (i = 0; i < nr_ops_sent; i++) {
aio = &aio_grp.cag_aio_ops[i];
m0_clovis_op_fini(aio->cao_op);
m0_clovis_op_free(aio->cao_op);
clovis_aio_vec_free(aio);
}
m0_clovis_entity_fini(&aio_grp.cag_obj.ob_entity);
clovis_aio_opgrp_fini(&aio_grp);

/* Not all ops are launched and executed successfully. */
if (rc != 0)
break;

/* QOS */
pthread_mutex_lock(&qos_lock);
qos_total_weight += op_cnt * bsz;
pthread_mutex_unlock(&qos_lock);
/* END */

cnt -= op_cnt;
}

fclose(fp);
return rc;
}

/*
* c0appz_cat()
* cat object.
Expand Down Expand Up @@ -1104,29 +1207,31 @@ int write_data_to_object(struct m0_uint128 id,
*/
static int write_data_to_object_async(struct clovis_aio_op *aio)
{
struct m0_clovis_obj *obj;
struct m0_clovis_op_ops op_ops;
struct clovis_aio_opgrp *grp;

grp = aio->cao_grp;
obj = &grp->cag_obj;

/* Create an WRITE op. */
m0_clovis_obj_op(obj, M0_CLOVIS_OC_WRITE,
&aio->cao_ext, &aio->cao_data,
&aio->cao_attr, 0, &aio->cao_op);
aio->cao_op->op_datum = aio;

/* Set op's Callbacks */
op_ops.oop_executed = clovis_aio_executed_cb;
op_ops.oop_stable = clovis_aio_stable_cb;
op_ops.oop_failed = clovis_aio_failed_cb;
m0_clovis_op_setup(aio->cao_op, &op_ops, 0);

/* Launch the write request */
m0_clovis_op_launch(&aio->cao_op, 1);

return 0;
struct m0_clovis_obj *obj;
struct m0_clovis_op_ops *op_ops;
struct clovis_aio_opgrp *grp;

grp = aio->cao_grp;
obj = &grp->cag_obj;
M0_ALLOC_PTR(op_ops);
if (op_ops != NULL) {
/* Create an WRITE op. */
m0_clovis_obj_op(obj, M0_CLOVIS_OC_WRITE,
&aio->cao_ext, &aio->cao_data,
&aio->cao_attr, 0, &aio->cao_op);
aio->cao_op->op_datum = aio;

/* Set op's Callbacks */
op_ops->oop_executed = clovis_aio_executed_cb;
op_ops->oop_stable = clovis_aio_stable_cb;
op_ops->oop_failed = clovis_aio_failed_cb;
m0_clovis_op_setup(aio->cao_op, op_ops, 0);

/* Launch the write request */
m0_clovis_op_launch(&aio->cao_op, 1);
}

return 0;
}

/*
Expand Down
2 changes: 2 additions & 0 deletions c0appz.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ int c0appz_ct(uint64_t idhi,uint64_t idlo,char *filename,uint64_t bsz,uint64_t c
int c0appz_cp(uint64_t idhi,uint64_t idlo,char *filename,uint64_t bsz,uint64_t cnt);
int c0appz_cp_async(uint64_t idhi,uint64_t idlo,char *src,uint64_t block_size,
uint64_t block_count,uint64_t op_cnt);
int c0appz_mw_async(uint64_t idhi, uint64_t idlo, char *src, uint64_t bsz,
uint64_t cnt, uint64_t op_cnt);

int c0appz_setrc(char *rcfile);
int c0appz_putrc(void);
Expand Down

0 comments on commit a8bb9cc

Please sign in to comment.