diff --git a/docs/source/example.rst b/docs/source/example.rst index d1a23e8..efac89e 100644 --- a/docs/source/example.rst +++ b/docs/source/example.rst @@ -39,7 +39,7 @@ The H5ESwait is also required before closing an event set, with: HDF5 Asynchronous I/O API ------------------------- -The HDF5-1.13 and later versions provide an asynchronous version of all the HDF5 I/O operations. A complete list of them can be found in the API section. Applications need to switch their existing HDF5 function calls to their asynchronous version, which can be done by adding "_async" to the end of the HDF5 function name and adding an event set ID as the last parameter to the function parameter list. One can also maintain both the original synchronous calls and asynchronous with a MACRO and decides which to use at compile time, e.g.,: +The HDF5-1.14 and later versions provide an asynchronous version of all the HDF5 I/O operations. A complete list of them can be found in the API section. Applications need to switch their existing HDF5 function calls to their asynchronous version, which can be done by adding "_async" to the end of the HDF5 function name and adding an event set ID as the last parameter to the function parameter list. One can also maintain both the original synchronous calls and asynchronous with a MACRO and decides which to use at compile time, e.g.,: .. code-block:: diff --git a/docs/source/hdf5api.rst b/docs/source/hdf5api.rst index d76b319..0578dab 100644 --- a/docs/source/hdf5api.rst +++ b/docs/source/hdf5api.rst @@ -1,6 +1,6 @@ HDF5 Async APIs =============== -HDF5-1.13 and later versions added a number of new APIs that allow applications to take advantage of the asynchronous I/O feature provided by the asynchronous I/O VOL connector. They are part of the HDF5 public header, so users only need to include the HDF5 header file (hdf5.h) to use them. +HDF5-1.14 and later versions added a number of new APIs that allow applications to take advantage of the asynchronous I/O feature provided by the asynchronous I/O VOL connector. They are part of the HDF5 public header, so users only need to include the HDF5 header file (hdf5.h) to use them. EventSet APIs ------------- diff --git a/src/h5_async_vol.c b/src/h5_async_vol.c index 5d8a020..9bff1ca 100644 --- a/src/h5_async_vol.c +++ b/src/h5_async_vol.c @@ -156,10 +156,8 @@ typedef struct async_task_t { char * name; task_type_t type; - struct H5VL_async_t *parent_obj; /* pointer back to the parent async object */ -#if H5_VERSION_GE(1, 13, 3) + struct H5VL_async_t * parent_obj; /* pointer back to the parent async object */ struct H5VL_async_t **parent_objs; /* pointer back to the parent async object when multi-dset is used*/ -#endif clock_t create_time; clock_t start_time; @@ -326,7 +324,6 @@ typedef struct async_dataset_open_args_t { } async_dataset_open_args_t; typedef struct async_dataset_read_args_t { -#if H5_VERSION_GE(1, 13, 3) size_t count; void **dset; hid_t *mem_type_id; @@ -335,19 +332,9 @@ typedef struct async_dataset_read_args_t { hid_t plist_id; void **buf; void **req; -#else - void * dset; - hid_t mem_type_id; - hid_t mem_space_id; - hid_t file_space_id; - hid_t plist_id; - void * buf; - void **req; -#endif } async_dataset_read_args_t; typedef struct async_dataset_write_args_t { -#if H5_VERSION_GE(1, 13, 3) size_t count; void **dset; hid_t *mem_type_id; @@ -356,15 +343,6 @@ typedef struct async_dataset_write_args_t { hid_t plist_id; void **buf; void **req; -#else - void * dset; - hid_t mem_type_id; - hid_t mem_space_id; - hid_t file_space_id; - hid_t plist_id; - void * buf; - void **req; -#endif #ifdef ENABLE_WRITE_MEMCPY bool free_buf; hsize_t data_size; @@ -700,22 +678,15 @@ static herr_t H5VL_async_attr_optional(void *obj, H5VL_optional_args_t *args, hi static herr_t H5VL_async_attr_close(void *attr, hid_t dxpl_id, void **req); /* Dataset callbacks */ -static void *H5VL_async_dataset_create(void *obj, const H5VL_loc_params_t *loc_params, const char *name, - hid_t lcpl_id, hid_t type_id, hid_t space_id, hid_t dcpl_id, - hid_t dapl_id, hid_t dxpl_id, void **req); -static void *H5VL_async_dataset_open(void *obj, const H5VL_loc_params_t *loc_params, const char *name, - hid_t dapl_id, hid_t dxpl_id, void **req); -#if H5_VERSION_GE(1, 13, 3) +static void * H5VL_async_dataset_create(void *obj, const H5VL_loc_params_t *loc_params, const char *name, + hid_t lcpl_id, hid_t type_id, hid_t space_id, hid_t dcpl_id, + hid_t dapl_id, hid_t dxpl_id, void **req); +static void * H5VL_async_dataset_open(void *obj, const H5VL_loc_params_t *loc_params, const char *name, + hid_t dapl_id, hid_t dxpl_id, void **req); static herr_t H5VL_async_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t mem_space_id[], hid_t file_space_id[], hid_t plist_id, void *buf[], void **req); static herr_t H5VL_async_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t mem_space_id[], hid_t file_space_id[], hid_t plist_id, const void *buf[], void **req); -#else -static herr_t H5VL_async_dataset_read(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, - hid_t plist_id, void *buf, void **req); -static herr_t H5VL_async_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, - hid_t plist_id, const void *buf, void **req); -#endif static herr_t H5VL_async_dataset_get(void *dset, H5VL_dataset_get_args_t *args, hid_t dxpl_id, void **req); static herr_t H5VL_async_dataset_specific(void *obj, H5VL_dataset_specific_args_t *args, hid_t dxpl_id, void **req); @@ -789,11 +760,7 @@ static herr_t H5VL_async_object_optional(void *obj, const H5VL_loc_params_t *loc /* Container/connector introspection callbacks */ static herr_t H5VL_async_introspect_get_conn_cls(void *obj, H5VL_get_conn_lvl_t lvl, const H5VL_class_t **conn_cls); -#if H5_VERSION_GE(1, 13, 3) static herr_t H5VL_async_introspect_get_cap_flags(const void *info, uint64_t *cap_flags); -#else -static herr_t H5VL_async_introspect_get_cap_flags(const void *info, unsigned *cap_flags); -#endif static herr_t H5VL_async_introspect_opt_query(void *obj, H5VL_subclass_t cls, int opt_type, uint64_t *flags); /* Async request callbacks */ @@ -2055,10 +2022,8 @@ free_async_task(async_task_t *task, const char *call_func) if (task->abt_thread) ABT_thread_free(&task->abt_thread); -#if H5_VERSION_GE(1, 13, 3) if (task->parent_objs) free(task->parent_objs); -#endif if (task->name) free(task->name); @@ -8376,7 +8341,6 @@ async_dataset_open(task_type_t qtype, async_instance_t *aid, H5VL_async_t *paren return NULL; } // End async_dataset_open -#if H5_VERSION_GE(1, 13, 3) static void async_dataset_read_fn(void *foo) { @@ -8545,7 +8509,7 @@ async_dataset_read_fn(void *foo) func_leave(__func__); return; -} // End async_dataset_read_fn > 1.13.3 +} // End async_dataset_read_fn static herr_t async_dataset_read(async_instance_t *aid, size_t count, H5VL_async_t **parent_obj, hid_t mem_type_id[], @@ -8766,7 +8730,7 @@ async_dataset_read(async_instance_t *aid, size_t count, H5VL_async_t **parent_ob } return -1; -} // End async_dataset_read > 1.13.3 +} // End async_dataset_read #ifdef ENABLE_MERGE_DSET static herr_t @@ -8878,21 +8842,68 @@ async_dataset_read_merge_mdset_col(async_instance_t *aid, size_t count, H5VL_asy done: return found_task; -} // End async_dataset_read_merge_mdset_col > 1.13.3 +} // End async_dataset_read_merge_mdset_col #endif -#else + +#ifdef ENABLE_WRITE_MEMCPY +int +is_contig_memspace(hid_t memspace) +{ + hsize_t nblocks; + int ndim; + H5S_sel_type type; + + if (memspace == H5S_SEL_ALL || memspace == H5S_SEL_NONE) { + func_log(__func__, "memspace is SEL_ALL/NONE, is contig"); + return 1; + } + + type = H5Sget_select_type(memspace); + if (type == H5S_SEL_POINTS) { + func_log(__func__, "memspace is SEL_POINTS, not contig"); + return 0; + } + else if (type == H5S_SEL_HYPERSLABS) { + ndim = H5Sget_simple_extent_ndims(memspace); + if (ndim != 1) { + func_log(__func__, "memspace dim > 1, not contig"); + return 0; + } + + nblocks = H5Sget_select_hyper_nblocks(memspace); + if (nblocks == 1) { + func_log(__func__, "nblock = 1, is contig"); + return 1; + } + func_log(__func__, "nblock > 1, not contig"); + return 0; + } + else if (type == H5S_SEL_ALL || type == H5S_SEL_NONE) { + func_log(__func__, "memspace is SEL_ALL/NONE, is contig"); + return 1; + } + else { + func_log_int1(__func__, "unexpected memspace sel type", (int)type); + } + + func_log_int1(__func__, "unknown memspace sel type", (int)type); + + return 0; +} +#endif + static void -async_dataset_read_fn(void *foo) +async_dataset_write_fn(void *foo) { - hbool_t acquired = false; - unsigned int mutex_count = 1; - int attempt_count = 0; - int is_lock = 0; - hbool_t is_lib_state_restored = false; - ABT_pool *pool_ptr; - async_task_t *task = (async_task_t *)foo; - async_dataset_read_args_t *args = (async_dataset_read_args_t *)(task->args); - herr_t status; + hbool_t acquired = false; + unsigned int mutex_count = 1; + int attempt_count = 0; + int is_lock = 0, count = 0; + hbool_t is_lib_state_restored = false; + ABT_pool * pool_ptr; + async_task_t * task = (async_task_t *)foo; + async_dataset_write_args_t *args = (async_dataset_write_args_t *)(task->args); + herr_t status; #ifdef ENABLE_TIMING task->start_time = clock(); @@ -8915,22 +8926,24 @@ async_dataset_read_fn(void *foo) func_log_int1(__func__, "global lock acquired, mutex_count", mutex_count); /* Update the dependent parent object if it is NULL */ - if (NULL == args->dset) { - if (NULL != task->parent_obj->under_object) { - args->dset = task->parent_obj->under_object; - } - else { - if (check_parent_task(task->parent_obj) != 0) { - task->err_stack = H5Ecreate_stack(); - H5Eappend_stack(task->err_stack, task->parent_obj->create_task->err_stack, false); - H5Epush(task->err_stack, __FILE__, __func__, __LINE__, async_error_class_g, H5E_VOL, - H5E_CANTCREATE, "Parent task failed"); + for (size_t i = 0; i < args->count; i++) { + if (NULL == args->dset[i]) { + if (NULL != task->parent_obj->under_object) { + args->dset[i] = task->parent_objs[i]->under_object; + } + else { + if (check_parent_task(task->parent_obj) != 0) { + task->err_stack = H5Ecreate_stack(); + H5Eappend_stack(task->err_stack, task->parent_obj->create_task->err_stack, false); + H5Epush(task->err_stack, __FILE__, __func__, __LINE__, async_error_class_g, H5E_VOL, + H5E_CANTCREATE, "Parent task failed"); #ifdef PRINT_ERROR_STACK - H5Eprint2(task->err_stack, stderr); + H5Eprint2(task->err_stack, stderr); #endif - fprintf(fout_g, " [ ABT ERROR] %s obj is NULL\n", __func__); - goto done; + fprintf(fout_g, " [ ABT ERROR] %s obj is NULL\n", __func__); + goto done; + } } } } @@ -8951,7 +8964,13 @@ async_dataset_read_fn(void *foo) assert(task->async_obj->obj_mutex); assert(task->async_obj->magic == ASYNC_MAGIC); while (1) { + if (count > 10000) { + fprintf(fout_g, " [ ABT DBG] %s cannot acquire object lock in 10s!\n", __func__); + break; + } + if (ABT_mutex_trylock(task->async_obj->obj_mutex) == ABT_SUCCESS) { + is_lock = 1; break; } else { @@ -8959,22 +8978,27 @@ async_dataset_read_fn(void *foo) break; } usleep(1000); + count++; } - is_lock = 1; func_log(__func__, "execute start"); /* Try executing operation, without default error stack handling */ H5E_BEGIN_TRY { - status = H5VLdataset_read(args->dset, task->under_vol_id, args->mem_type_id, args->mem_space_id, - args->file_space_id, args->plist_id, args->buf, NULL); + status = H5VLdataset_write(args->count, args->dset, task->under_vol_id, args->mem_type_id, + args->mem_space_id, args->file_space_id, args->plist_id, + (const void **)args->buf, NULL); check_app_wait(attempt_count + 4, __func__); } H5E_END_TRY if (status < 0) { if ((task->err_stack = H5Eget_current_stack()) < 0) fprintf(fout_g, " [ ABT ERROR] %s H5Eget_current_stack failed\n", __func__); + fprintf(fout_g, " [ ABT LOG] Argobots execute %s failed\n", __func__); +#ifdef PRINT_ERROR_STACK + H5Eprint2(task->err_stack, stderr); +#endif goto done; } @@ -8987,16 +9011,34 @@ async_dataset_read_fn(void *foo) fprintf(fout_g, " [ ABT ERROR] %s H5VLfree_lib_state failed\n", __func__); task->h5_state = NULL; - if (args->mem_type_id > 0) - H5Tclose(args->mem_type_id); - if (args->mem_space_id > H5S_PLIST && args->mem_space_id < H5S_UNLIMITED) - H5Sclose(args->mem_space_id); - if (args->file_space_id > H5S_PLIST && args->file_space_id < H5S_UNLIMITED) - H5Sclose(args->file_space_id); - if (args->plist_id > 0) + for (size_t i = 0; i < args->count; i++) { + if (args->mem_type_id[i] > 0) + H5Tclose(args->mem_type_id[i]); + if (args->mem_space_id[i] > H5S_PLIST && args->mem_space_id[i] < H5S_UNLIMITED) + H5Sclose(args->mem_space_id[i]); + if (args->file_space_id[i] > H5S_PLIST && args->file_space_id[i] < H5S_UNLIMITED) + H5Sclose(args->file_space_id[i]); + } + if (args->mem_type_id) { + free(args->mem_type_id); + args->mem_type_id = NULL; + } + if (args->mem_space_id) { + free(args->mem_space_id); + args->mem_space_id = NULL; + } + if (args->file_space_id) { + free(args->file_space_id); + args->file_space_id = NULL; + } + if (args->plist_id > 0) { H5Pclose(args->plist_id); - free(args); - task->args = NULL; + args->plist_id = 0; + } + if (args->dset) { + free(args->dset); + args->dset = NULL; + } if (is_lock == 1) { if (ABT_mutex_unlock(task->async_obj->obj_mutex) != ABT_SUCCESS) @@ -9005,7 +9047,7 @@ async_dataset_read_fn(void *foo) ABT_eventual_set(task->eventual, NULL, 0); task->in_abt_pool = 0; - task->is_done = 1; + task->is_done = 1; /* remove_task_from_queue(task, __func__); */ @@ -9016,36 +9058,63 @@ async_dataset_read_fn(void *foo) } if (async_instance_g && NULL != async_instance_g->qhead.queue && async_instance_g->start_abt_push) push_task_to_abt_pool(&async_instance_g->qhead, *pool_ptr, __func__); + +#ifdef ENABLE_WRITE_MEMCPY + if (args->free_buf && args->buf) { + for (size_t i = 0; i < args->count; i++) { + free(args->buf[i]); + async_instance_g->used_mem -= args->data_size; + } + func_log(__func__, "released dset memcpy"); + } +#endif + if (task->args) { + free(task->args); + task->args = NULL; + } + #ifdef ENABLE_TIMING task->end_time = clock(); #endif func_leave(__func__); return; -} // End async_dataset_read_fn < 1.13.3 +} // End async_dataset_write_fn static herr_t -async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_type_id, hid_t mem_space_id, - hid_t file_space_id, hid_t plist_id, void *buf, void **req) +async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_obj, hid_t mem_type_id[], + hid_t mem_space_id[], hid_t file_space_id[], hid_t plist_id, const void **buf, void **req) { - // For implicit mode (env var), make all read to be blocking - assert(async_instance_g); - async_task_t *async_task = NULL; - async_dataset_read_args_t *args = NULL; - bool lock_parent = false; - bool is_blocking = false; - hbool_t acquired = false; - unsigned int mutex_count = 1; + async_task_t * async_task = NULL; + async_dataset_write_args_t *args = NULL; + bool lock_parent = false; + bool is_blocking = false; + hbool_t acquired = false; + unsigned int mutex_count = 1; +#ifdef MPI_VERSION + H5FD_mpio_xfer_t xfer_mode = H5FD_MPIO_INDEPENDENT; +#endif func_enter(__func__, NULL); assert(aid); assert(parent_obj); - assert(parent_obj->magic == ASYNC_MAGIC); + assert(parent_obj[0]->magic == ASYNC_MAGIC); + + for (size_t i = 0; i < count; i++) { + if (mem_type_id[i] < 0) + goto error; + if (mem_space_id[i] < 0) + goto error; + if (file_space_id[i] < 0) + goto error; + /* if (buf[i] == NULL) */ + /* goto error; */ + } async_instance_g->prev_push_state = async_instance_g->start_abt_push; - if ((args = (async_dataset_read_args_t *)calloc(1, sizeof(async_dataset_read_args_t))) == NULL) { + if ((args = (async_dataset_write_args_t *)calloc(1, sizeof(async_dataset_write_args_t))) == NULL) { fprintf(fout_g, " [ASYNC VOL ERROR] %s with calloc\n", __func__); goto error; } @@ -9058,37 +9127,121 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty #ifdef ENABLE_TIMING async_task->create_time = clock(); #endif - args->dset = parent_obj->under_object; - if (mem_type_id > 0) - args->mem_type_id = H5Tcopy(mem_type_id); - if (mem_space_id > H5S_PLIST && mem_space_id < H5S_UNLIMITED) - args->mem_space_id = H5Scopy(mem_space_id); - else - args->mem_space_id = mem_space_id; - if (file_space_id > H5S_PLIST && file_space_id < H5S_UNLIMITED) - args->file_space_id = H5Scopy(file_space_id); - else - args->file_space_id = file_space_id; + args->dset = (void **)calloc(count, sizeof(void *)); + args->buf = (void **)calloc(count, sizeof(void *)); + args->mem_type_id = (hid_t *)calloc(count, sizeof(hid_t)); + args->mem_space_id = (hid_t *)calloc(count, sizeof(hid_t)); + args->file_space_id = (hid_t *)calloc(count, sizeof(hid_t)); + for (size_t i = 0; i < count; i++) { + args->dset[i] = parent_obj[i]->under_object; + if (mem_type_id[i] > 0) + args->mem_type_id[i] = H5Tcopy(mem_type_id[i]); + else + args->mem_type_id[i] = mem_type_id[i]; + if (mem_space_id[i] > H5S_PLIST && mem_space_id[i] < H5S_UNLIMITED) + args->mem_space_id[i] = H5Scopy(mem_space_id[i]); + else + args->mem_space_id[i] = mem_space_id[i]; + if (file_space_id[i] > H5S_PLIST && file_space_id[i] < H5S_UNLIMITED) + args->file_space_id[i] = H5Scopy(file_space_id[i]); + else + args->file_space_id[i] = file_space_id[i]; + args->buf[i] = (void *)buf[i]; + } if (plist_id > 0) args->plist_id = H5Pcopy(plist_id); - args->buf = buf; - args->req = req; + args->req = req; + args->count = count; - if (req) { - H5VL_async_t *new_req; - if ((new_req = H5VL_async_new_obj(NULL, parent_obj->under_vol_id)) == NULL) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with request object calloc\n", __func__); - goto error; +#ifdef ENABLE_WRITE_MEMCPY + hsize_t buf_size = 0; + for (size_t i = 0; i < count; i++) { + func_log_int1(__func__, "start write memcpy iter", i); + if (parent_obj[i]->data_size > 0 && + (args->file_space_id[i] == H5S_ALL || args->mem_space_id[i] == H5S_ALL)) { + buf_size = parent_obj[i]->data_size; + } + else { + buf_size = H5Tget_size(mem_type_id[i]) * H5Sget_select_npoints(mem_space_id[i]); + if (buf_size == 0) + fprintf(fout_g, " [ASYNC VOL INFO] %s dataset size is 0\n", __func__); } - new_req->my_task = async_task; - /* new_req->under_object = new_req; */ - new_req->file_async_obj = parent_obj->file_async_obj; - *req = (void *)new_req; - } - else { - is_blocking = true; - async_instance_g->start_abt_push = true; - } + + /* fprintf(fout_g, "buf size = %llu\n", buf_size); */ + + // Get available system memory + hsize_t avail_mem = (hsize_t)get_avphys_pages() * sysconf(_SC_PAGESIZE); + func_log_uint64_1(__func__, "system available memory bytes", avail_mem); + func_log_uint64_1(__func__, "vol-async used memory bytes", async_instance_g->used_mem); + func_log_uint64_1(__func__, "vol-async max memory bytes", async_instance_g->max_mem); + + if (async_instance_g->used_mem + buf_size > async_instance_g->max_mem) { + is_blocking = true; + fprintf(fout_g, + " [ ABT INFO] %d write size %lu larger than async memory limit %lu, switch to " + "synchronous write\n", + async_instance_g->mpi_rank, buf_size, async_instance_g->max_mem); + } + else if (buf_size > avail_mem) { + is_blocking = true; + fprintf(fout_g, + " [ ABT INFO] %d write size %lu larger than available memory %lu, switch to " + "synchronous write\n", + async_instance_g->mpi_rank, buf_size, avail_mem); + } + else if (buf_size > 0) { + func_log_uint64_1(__func__, "allocate double memory bytes", buf_size); + if (NULL == (args->buf[i] = malloc(buf_size))) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s malloc failed!\n", __func__); + goto done; + } + async_instance_g->used_mem += buf_size; + args->free_buf = true; + args->data_size += buf_size; + + // If is contiguous space, no need to go through gather process as it can be costly + if (1 != is_contig_memspace(mem_space_id[i])) { + func_log(__func__, "memspace is not contiguous, need H5Dgather"); + if (H5Dgather(mem_space_id[i], buf[i], mem_type_id[i], buf_size, args->buf[i], NULL, NULL) < + 0) + fprintf(fout_g, " [ASYNC VOL ERROR] %s H5Dgather failed!\n", __func__); + + hsize_t elem_size = H5Tget_size(mem_type_id[i]); + if (elem_size == 0) + elem_size = 1; + hsize_t n_elem = (hsize_t)(buf_size / elem_size); + if (args->mem_space_id[i] > 0) + H5Sclose(args->mem_space_id[i]); + args->mem_space_id[i] = H5Screate_simple(1, &n_elem, NULL); + } + else { + func_log(__func__, "memspace contiguous, memcpy directly"); + memcpy(args->buf[i], buf[i], buf_size); + } + } + else { + func_log(__func__, "write size is 0, no memcpy"); + } + func_log_int1(__func__, "end write memcpy iter", i); + } +#endif + + if (req) { + H5VL_async_t *new_req; + if ((new_req = H5VL_async_new_obj(NULL, parent_obj[0]->under_vol_id)) == NULL) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with request object calloc\n", __func__); + goto error; + } + new_req->my_task = async_task; + /* new_req->under_object = new_req; */ + new_req->file_async_obj = parent_obj[0]->file_async_obj; + *req = (void *)new_req; + } + // Comment out the code below to allow implicit mode async dset write + else { + is_blocking = true; + async_instance_g->start_abt_push = true; + } // Retrieve current library state if (H5VLretrieve_lib_state(&async_task->h5_state) < 0) { @@ -9096,16 +9249,19 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty goto done; } - async_task->func = async_dataset_read_fn; - async_task->args = args; - async_task->op = READ; - async_task->under_vol_id = parent_obj->under_vol_id; - async_task->async_obj = parent_obj; - async_task->parent_obj = parent_obj; + async_task->func = async_dataset_write_fn; + async_task->args = args; + async_task->op = WRITE; + async_task->under_vol_id = parent_obj[0]->under_vol_id; + async_task->async_obj = parent_obj[0]; + async_task->parent_obj = parent_obj[0]; + async_task->parent_objs = (struct H5VL_async_t **)calloc(count, sizeof(struct H5VL_async_t *)); + for (size_t i = 0; i < count; i++) + async_task->parent_objs[i] = parent_obj[i]; /* Lock parent_obj */ while (1) { - if (parent_obj->obj_mutex && ABT_mutex_trylock(parent_obj->obj_mutex) == ABT_SUCCESS) { + if (parent_obj[0]->obj_mutex && ABT_mutex_trylock(parent_obj[0]->obj_mutex) == ABT_SUCCESS) { lock_parent = true; break; } @@ -9123,12 +9279,22 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty func_log_int1(__func__, "acquired global lock, count", mutex_count); } - parent_obj->task_cnt++; - parent_obj->pool_ptr = &aid->pool; + parent_obj[0]->task_cnt++; + parent_obj[0]->pool_ptr = &aid->pool; + +#ifdef MPI_VERSION + H5Pget_dxpl_mpio(plist_id, &xfer_mode); +#endif + /* Check if its parent has valid object */ - if (NULL == parent_obj->under_object) { - if (NULL != parent_obj->create_task) { - add_task_to_queue(&aid->qhead, async_task, DEPENDENT); + if (NULL == parent_obj[0]->under_object) { + if (NULL != parent_obj[0]->create_task) { +#ifdef MPI_VERSION + if (xfer_mode == H5FD_MPIO_COLLECTIVE) + add_task_to_queue(&aid->qhead, async_task, COLLECTIVE); + else +#endif + add_task_to_queue(&aid->qhead, async_task, DEPENDENT); } else { fprintf(fout_g, " [ASYNC VOL ERROR] %s parent task not created\n", __func__); @@ -9137,8 +9303,6 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty } else { #ifdef MPI_VERSION - H5FD_mpio_xfer_t xfer_mode; - H5Pget_dxpl_mpio(plist_id, &xfer_mode); if (xfer_mode == H5FD_MPIO_COLLECTIVE) add_task_to_queue(&aid->qhead, async_task, COLLECTIVE); else @@ -9146,7 +9310,7 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty add_task_to_queue(&aid->qhead, async_task, REGULAR); } - if (ABT_mutex_unlock(parent_obj->obj_mutex) != ABT_SUCCESS) { + if (ABT_mutex_unlock(parent_obj[0]->obj_mutex) != ABT_SUCCESS) { fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); goto error; } @@ -9174,1106 +9338,215 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty async_instance_g->start_abt_push = async_instance_g->prev_push_state; if (lock_parent) { - if (ABT_mutex_unlock(parent_obj->obj_mutex) != ABT_SUCCESS) + if (ABT_mutex_unlock(parent_obj[0]->obj_mutex) != ABT_SUCCESS) fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); } if (async_task && async_task->args) { - free(async_task->args); - if (async_task) - async_task->args = NULL; + for (size_t i = 0; i < count; i++) { + if (args->mem_type_id && args->mem_type_id[i] > 0) + H5Tclose(args->mem_type_id[i]); + if (args->mem_space_id && args->mem_space_id[i] > H5S_PLIST && + args->mem_space_id[i] < H5S_UNLIMITED) + H5Sclose(args->mem_space_id[i]); + if (args->file_space_id && args->file_space_id[i] > H5S_PLIST && + args->file_space_id[i] < H5S_UNLIMITED) + H5Sclose(args->file_space_id[i]); + } + if (args->plist_id > 0) + H5Pclose(args->plist_id); + + if (args->dset) + free(args->dset); + if (args->buf) + free(args->buf); + if (args->mem_type_id) + free(mem_type_id); + if (args->mem_space_id) + free(mem_space_id); + if (args->file_space_id) + free(file_space_id); + free(args); } + if (async_task) + async_task->args = NULL; return -1; -} // End async_dataset_read < 1.13.3 -#endif +} // End async_dataset_write -#ifdef ENABLE_WRITE_MEMCPY -int -is_contig_memspace(hid_t memspace) +#ifdef ENABLE_MERGE_DSET +// Check and merge current write into an existing one in queue, must be collective +static herr_t +async_dataset_write_merge_mdset_col(async_instance_t *aid, size_t count, H5VL_async_t **parent_obj, + hid_t mem_type_id[], hid_t mem_space_id[], hid_t file_space_id[], + hid_t plist_id, const void **buf, void **req) { - hsize_t nblocks; - int ndim; - H5S_sel_type type; + async_task_t * task_elt; + async_dataset_write_args_t *iter_args = NULL; + int found_task = 0, iter_cnt = 0, total_cnt = 0, is_first = 1; - if (memspace == H5S_SEL_ALL || memspace == H5S_SEL_NONE) { - func_log(__func__, "memspace is SEL_ALL/NONE, is contig"); - return 1; - } + func_enter(__func__, NULL); - type = H5Sget_select_type(memspace); - if (type == H5S_SEL_POINTS) { - func_log(__func__, "memspace is SEL_POINTS, not contig"); - return 0; - } - else if (type == H5S_SEL_HYPERSLABS) { - ndim = H5Sget_simple_extent_ndims(memspace); - if (ndim != 1) { - func_log(__func__, "memspace dim > 1, not contig"); - return 0; - } + assert(aid); + assert(parent_obj); + assert(parent_obj[0]->magic == ASYNC_MAGIC); - nblocks = H5Sget_select_hyper_nblocks(memspace); - if (nblocks == 1) { - func_log(__func__, "nblock = 1, is contig"); - return 1; - } - func_log(__func__, "nblock > 1, not contig"); - return 0; - } - else if (type == H5S_SEL_ALL || type == H5S_SEL_NONE) { - func_log(__func__, "memspace is SEL_ALL/NONE, is contig"); - return 1; - } - else { - func_log_int1(__func__, "unexpected memspace sel type", (int)type); + if (NULL == aid->qhead.queue) { + func_log(__func__, "qhead->queue is NULL"); + goto done; } - func_log_int1(__func__, "unknown memspace sel type", (int)type); - - return 0; -} -#endif - -#if H5_VERSION_GE(1, 13, 3) -static void -async_dataset_write_fn(void *foo) -{ - hbool_t acquired = false; - unsigned int mutex_count = 1; - int attempt_count = 0; - int is_lock = 0, count = 0; - hbool_t is_lib_state_restored = false; - ABT_pool * pool_ptr; - async_task_t * task = (async_task_t *)foo; - async_dataset_write_args_t *args = (async_dataset_write_args_t *)(task->args); - herr_t status; - -#ifdef ENABLE_TIMING - task->start_time = clock(); -#endif - - func_enter(__func__, NULL); - - assert(args); - assert(task); - assert(task->async_obj); - assert(task->async_obj->magic == ASYNC_MAGIC); + if (ABT_mutex_lock(aid->qhead.head_mutex) != ABT_SUCCESS) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_lock\n", __func__); + return -1; + } - pool_ptr = task->async_obj->pool_ptr; + // Reverse iter task list + DL_FOREACH2(aid->qhead.queue, task_elt, prev) + { + // Break out when done reverse iteration + if (is_first == 0 && task_elt == aid->qhead.queue) + break; - func_log(__func__, "trying to acquire global lock"); + if (task_elt->type != COLLECTIVE) + continue; - if ((attempt_count = check_app_acquire_mutex_fn(task, &mutex_count, &acquired)) < 0) - goto done; + // Must be same file and a dset write task + if (task_elt->async_obj->file_async_obj == parent_obj[0]->file_async_obj && + task_elt->func == async_dataset_write_fn) { + // append current write to existing multi dset write + iter_args = task_elt->args; + iter_cnt = iter_args->count; + total_cnt = iter_cnt + count; - func_log_int1(__func__, "global lock acquired, mutex_count", mutex_count); + if (plist_id > 0) { + if (H5Pequal(iter_args->plist_id, plist_id) <= 0) { + func_log(__func__, "dxpl is not the same, cannot merge to multi-dset write"); + continue; + } + } - /* Update the dependent parent object if it is NULL */ - for (size_t i = 0; i < args->count; i++) { - if (NULL == args->dset[i]) { - if (NULL != task->parent_obj->under_object) { - args->dset[i] = task->parent_objs[i]->under_object; - } - else { - if (check_parent_task(task->parent_obj) != 0) { - task->err_stack = H5Ecreate_stack(); - H5Eappend_stack(task->err_stack, task->parent_obj->create_task->err_stack, false); - H5Epush(task->err_stack, __FILE__, __func__, __LINE__, async_error_class_g, H5E_VOL, - H5E_CANTCREATE, "Parent task failed"); - -#ifdef PRINT_ERROR_STACK - H5Eprint2(task->err_stack, stderr); -#endif - fprintf(fout_g, " [ ABT ERROR] %s obj is NULL\n", __func__); - goto done; - } - } - } - } - - // Restore previous library state - assert(task->h5_state); - if (H5VLOPEN_LIB_CONTEXT(&task->h5_context) < 0) { - fprintf(fout_g, " [ ABT ERROR] %s %s failed\n", __func__, FUNC_H5VLOPEN_LIB_CONTEXT); - goto done; - } - if (H5VLrestore_lib_state(task->h5_state) < 0) { - fprintf(fout_g, " [ ABT ERROR] %s H5VLrestore_lib_state failed\n", __func__); - goto done; - } - is_lib_state_restored = true; - - /* Acquire async obj mutex and set the obj */ - assert(task->async_obj->obj_mutex); - assert(task->async_obj->magic == ASYNC_MAGIC); - while (1) { - if (count > 10000) { - fprintf(fout_g, " [ ABT DBG] %s cannot acquire object lock in 10s!\n", __func__); - break; - } - - if (ABT_mutex_trylock(task->async_obj->obj_mutex) == ABT_SUCCESS) { - is_lock = 1; - break; - } - else { - fprintf(fout_g, " [ ABT DBG] %s error with try_lock\n", __func__); - break; - } - usleep(1000); - count++; - } - - func_log(__func__, "execute start"); - - /* Try executing operation, without default error stack handling */ - H5E_BEGIN_TRY - { - status = H5VLdataset_write(args->count, args->dset, task->under_vol_id, args->mem_type_id, - args->mem_space_id, args->file_space_id, args->plist_id, - (const void **)args->buf, NULL); - check_app_wait(attempt_count + 4, __func__); - } - H5E_END_TRY - if (status < 0) { - if ((task->err_stack = H5Eget_current_stack()) < 0) - fprintf(fout_g, " [ ABT ERROR] %s H5Eget_current_stack failed\n", __func__); - fprintf(fout_g, " [ ABT LOG] Argobots execute %s failed\n", __func__); -#ifdef PRINT_ERROR_STACK - H5Eprint2(task->err_stack, stderr); -#endif - goto done; - } - - func_log(__func__, "execute success"); - -done: - if (is_lib_state_restored && H5VLCLOSE_LIB_CONTEXT(task->h5_context) < 0) - fprintf(fout_g, " [ ABT ERROR] %s %s failed\n", __func__, FUNC_H5VLCLOSE_LIB_CONTEXT); - if (NULL != task->h5_state && H5VLfree_lib_state(task->h5_state) < 0) - fprintf(fout_g, " [ ABT ERROR] %s H5VLfree_lib_state failed\n", __func__); - task->h5_state = NULL; - - for (size_t i = 0; i < args->count; i++) { - if (args->mem_type_id[i] > 0) - H5Tclose(args->mem_type_id[i]); - if (args->mem_space_id[i] > H5S_PLIST && args->mem_space_id[i] < H5S_UNLIMITED) - H5Sclose(args->mem_space_id[i]); - if (args->file_space_id[i] > H5S_PLIST && args->file_space_id[i] < H5S_UNLIMITED) - H5Sclose(args->file_space_id[i]); - } - if (args->mem_type_id) { - free(args->mem_type_id); - args->mem_type_id = NULL; - } - if (args->mem_space_id) { - free(args->mem_space_id); - args->mem_space_id = NULL; - } - if (args->file_space_id) { - free(args->file_space_id); - args->file_space_id = NULL; - } - if (args->plist_id > 0) { - H5Pclose(args->plist_id); - args->plist_id = 0; - } - if (args->dset) { - free(args->dset); - args->dset = NULL; - } - - if (is_lock == 1) { - if (ABT_mutex_unlock(task->async_obj->obj_mutex) != ABT_SUCCESS) - fprintf(fout_g, " [ ABT ERROR] %s ABT_mutex_unlock failed\n", __func__); - } - - ABT_eventual_set(task->eventual, NULL, 0); - task->in_abt_pool = 0; - task->is_done = 1; - - /* remove_task_from_queue(task, __func__); */ - - func_log(__func__, "release global lock"); - - if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ ABT ERROR] %s H5TSmutex_release failed\n", __func__); - } - if (async_instance_g && NULL != async_instance_g->qhead.queue && async_instance_g->start_abt_push) - push_task_to_abt_pool(&async_instance_g->qhead, *pool_ptr, __func__); - -#ifdef ENABLE_WRITE_MEMCPY - if (args->free_buf && args->buf) { - for (size_t i = 0; i < args->count; i++) { - free(args->buf[i]); - async_instance_g->used_mem -= args->data_size; - } - func_log(__func__, "released dset memcpy"); - } -#endif - if (task->args) { - free(task->args); - task->args = NULL; - } - -#ifdef ENABLE_TIMING - task->end_time = clock(); -#endif - func_leave(__func__); - - return; -} // End async_dataset_write_fn > 1.13.3 - -static herr_t -async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_obj, hid_t mem_type_id[], - hid_t mem_space_id[], hid_t file_space_id[], hid_t plist_id, const void **buf, void **req) -{ - async_task_t * async_task = NULL; - async_dataset_write_args_t *args = NULL; - bool lock_parent = false; - bool is_blocking = false; - hbool_t acquired = false; - unsigned int mutex_count = 1; -#ifdef MPI_VERSION - H5FD_mpio_xfer_t xfer_mode = H5FD_MPIO_INDEPENDENT; -#endif - - func_enter(__func__, NULL); - - assert(aid); - assert(parent_obj); - assert(parent_obj[0]->magic == ASYNC_MAGIC); - - for (size_t i = 0; i < count; i++) { - if (mem_type_id[i] < 0) - goto error; - if (mem_space_id[i] < 0) - goto error; - if (file_space_id[i] < 0) - goto error; - /* if (buf[i] == NULL) */ - /* goto error; */ - } - - async_instance_g->prev_push_state = async_instance_g->start_abt_push; - - if ((args = (async_dataset_write_args_t *)calloc(1, sizeof(async_dataset_write_args_t))) == NULL) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with calloc\n", __func__); - goto error; - } - /* create a new task and insert into its file task list */ - if ((async_task = create_async_task()) == NULL) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with calloc\n", __func__); - goto error; - } - -#ifdef ENABLE_TIMING - async_task->create_time = clock(); -#endif - args->dset = (void **)calloc(count, sizeof(void *)); - args->buf = (void **)calloc(count, sizeof(void *)); - args->mem_type_id = (hid_t *)calloc(count, sizeof(hid_t)); - args->mem_space_id = (hid_t *)calloc(count, sizeof(hid_t)); - args->file_space_id = (hid_t *)calloc(count, sizeof(hid_t)); - for (size_t i = 0; i < count; i++) { - args->dset[i] = parent_obj[i]->under_object; - if (mem_type_id[i] > 0) - args->mem_type_id[i] = H5Tcopy(mem_type_id[i]); - else - args->mem_type_id[i] = mem_type_id[i]; - if (mem_space_id[i] > H5S_PLIST && mem_space_id[i] < H5S_UNLIMITED) - args->mem_space_id[i] = H5Scopy(mem_space_id[i]); - else - args->mem_space_id[i] = mem_space_id[i]; - if (file_space_id[i] > H5S_PLIST && file_space_id[i] < H5S_UNLIMITED) - args->file_space_id[i] = H5Scopy(file_space_id[i]); - else - args->file_space_id[i] = file_space_id[i]; - args->buf[i] = (void *)buf[i]; - } - if (plist_id > 0) - args->plist_id = H5Pcopy(plist_id); - args->req = req; - args->count = count; - -#ifdef ENABLE_WRITE_MEMCPY - hsize_t buf_size = 0; - for (size_t i = 0; i < count; i++) { - func_log_int1(__func__, "start write memcpy iter", i); - if (parent_obj[i]->data_size > 0 && - (args->file_space_id[i] == H5S_ALL || args->mem_space_id[i] == H5S_ALL)) { - buf_size = parent_obj[i]->data_size; - } - else { - buf_size = H5Tget_size(mem_type_id[i]) * H5Sget_select_npoints(mem_space_id[i]); - if (buf_size == 0) - fprintf(fout_g, " [ASYNC VOL INFO] %s dataset size is 0\n", __func__); - } - - /* fprintf(fout_g, "buf size = %llu\n", buf_size); */ - - // Get available system memory - hsize_t avail_mem = (hsize_t)get_avphys_pages() * sysconf(_SC_PAGESIZE); - func_log_uint64_1(__func__, "system available memory bytes", avail_mem); - func_log_uint64_1(__func__, "vol-async used memory bytes", async_instance_g->used_mem); - func_log_uint64_1(__func__, "vol-async max memory bytes", async_instance_g->max_mem); - - if (async_instance_g->used_mem + buf_size > async_instance_g->max_mem) { - is_blocking = true; - fprintf(fout_g, - " [ ABT INFO] %d write size %lu larger than async memory limit %lu, switch to " - "synchronous write\n", - async_instance_g->mpi_rank, buf_size, async_instance_g->max_mem); - } - else if (buf_size > avail_mem) { - is_blocking = true; - fprintf(fout_g, - " [ ABT INFO] %d write size %lu larger than available memory %lu, switch to " - "synchronous write\n", - async_instance_g->mpi_rank, buf_size, avail_mem); - } - else if (buf_size > 0) { - func_log_uint64_1(__func__, "allocate double memory bytes", buf_size); - if (NULL == (args->buf[i] = malloc(buf_size))) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s malloc failed!\n", __func__); - goto done; - } - async_instance_g->used_mem += buf_size; - args->free_buf = true; - args->data_size += buf_size; - - // If is contiguous space, no need to go through gather process as it can be costly - if (1 != is_contig_memspace(mem_space_id[i])) { - func_log(__func__, "memspace is not contiguous, need H5Dgather"); - if (H5Dgather(mem_space_id[i], buf[i], mem_type_id[i], buf_size, args->buf[i], NULL, NULL) < - 0) - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5Dgather failed!\n", __func__); - - hsize_t elem_size = H5Tget_size(mem_type_id[i]); - if (elem_size == 0) - elem_size = 1; - hsize_t n_elem = (hsize_t)(buf_size / elem_size); - if (args->mem_space_id[i] > 0) - H5Sclose(args->mem_space_id[i]); - args->mem_space_id[i] = H5Screate_simple(1, &n_elem, NULL); - } - else { - func_log(__func__, "memspace contiguous, memcpy directly"); - memcpy(args->buf[i], buf[i], buf_size); - } - } - else { - func_log(__func__, "write size is 0, no memcpy"); - } - func_log_int1(__func__, "end write memcpy iter", i); - } -#endif - - if (req) { - H5VL_async_t *new_req; - if ((new_req = H5VL_async_new_obj(NULL, parent_obj[0]->under_vol_id)) == NULL) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with request object calloc\n", __func__); - goto error; - } - new_req->my_task = async_task; - /* new_req->under_object = new_req; */ - new_req->file_async_obj = parent_obj[0]->file_async_obj; - *req = (void *)new_req; - } - // Comment out the code below to allow implicit mode async dset write - else { - is_blocking = true; - async_instance_g->start_abt_push = true; - } - - // Retrieve current library state - if (H5VLretrieve_lib_state(&async_task->h5_state) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5VLretrieve_lib_state failed\n", __func__); - goto done; - } - - async_task->func = async_dataset_write_fn; - async_task->args = args; - async_task->op = WRITE; - async_task->under_vol_id = parent_obj[0]->under_vol_id; - async_task->async_obj = parent_obj[0]; - async_task->parent_obj = parent_obj[0]; - async_task->parent_objs = (struct H5VL_async_t **)calloc(count, sizeof(struct H5VL_async_t *)); - for (size_t i = 0; i < count; i++) - async_task->parent_objs[i] = parent_obj[i]; - - /* Lock parent_obj */ - while (1) { - if (parent_obj[0]->obj_mutex && ABT_mutex_trylock(parent_obj[0]->obj_mutex) == ABT_SUCCESS) { - lock_parent = true; - break; - } - // Temp release global lock in case background is waiting - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - usleep(1000); - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto error; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - } - - parent_obj[0]->task_cnt++; - parent_obj[0]->pool_ptr = &aid->pool; - -#ifdef MPI_VERSION - H5Pget_dxpl_mpio(plist_id, &xfer_mode); -#endif - - /* Check if its parent has valid object */ - if (NULL == parent_obj[0]->under_object) { - if (NULL != parent_obj[0]->create_task) { -#ifdef MPI_VERSION - if (xfer_mode == H5FD_MPIO_COLLECTIVE) - add_task_to_queue(&aid->qhead, async_task, COLLECTIVE); - else -#endif - add_task_to_queue(&aid->qhead, async_task, DEPENDENT); - } - else { - fprintf(fout_g, " [ASYNC VOL ERROR] %s parent task not created\n", __func__); - goto error; - } - } - else { -#ifdef MPI_VERSION - if (xfer_mode == H5FD_MPIO_COLLECTIVE) - add_task_to_queue(&aid->qhead, async_task, COLLECTIVE); - else -#endif - add_task_to_queue(&aid->qhead, async_task, REGULAR); - } - - if (ABT_mutex_unlock(parent_obj[0]->obj_mutex) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); - goto error; - } - lock_parent = false; - if (aid->ex_delay == false && !async_instance_g->pause) { - if (get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - } - - if (is_blocking) { - if (block_and_wait_task(async_task, __func__) < 0) - goto error; - } - - // Restore async operation state - async_instance_g->start_abt_push = async_instance_g->prev_push_state; - - func_leave(__func__); - -done: - return 0; -error: - fprintf(fout_g, " [ASYNC VOL ERROR] %s\n", __func__); - // Restore async operation state - async_instance_g->start_abt_push = async_instance_g->prev_push_state; - - if (lock_parent) { - if (ABT_mutex_unlock(parent_obj[0]->obj_mutex) != ABT_SUCCESS) - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); - } - if (async_task && async_task->args) { - for (size_t i = 0; i < count; i++) { - if (args->mem_type_id && args->mem_type_id[i] > 0) - H5Tclose(args->mem_type_id[i]); - if (args->mem_space_id && args->mem_space_id[i] > H5S_PLIST && - args->mem_space_id[i] < H5S_UNLIMITED) - H5Sclose(args->mem_space_id[i]); - if (args->file_space_id && args->file_space_id[i] > H5S_PLIST && - args->file_space_id[i] < H5S_UNLIMITED) - H5Sclose(args->file_space_id[i]); - } - if (args->plist_id > 0) - H5Pclose(args->plist_id); - - if (args->dset) - free(args->dset); - if (args->buf) - free(args->buf); - if (args->mem_type_id) - free(mem_type_id); - if (args->mem_space_id) - free(mem_space_id); - if (args->file_space_id) - free(file_space_id); - free(args); - } - if (async_task) - async_task->args = NULL; - return -1; -} // End async_dataset_write > 1.13.3 - -#ifdef ENABLE_MERGE_DSET -// Check and merge current write into an existing one in queue, must be collective -static herr_t -async_dataset_write_merge_mdset_col(async_instance_t *aid, size_t count, H5VL_async_t **parent_obj, - hid_t mem_type_id[], hid_t mem_space_id[], hid_t file_space_id[], - hid_t plist_id, const void **buf, void **req) -{ - async_task_t * task_elt; - async_dataset_write_args_t *iter_args = NULL; - int found_task = 0, iter_cnt = 0, total_cnt = 0, is_first = 1; - - func_enter(__func__, NULL); - - assert(aid); - assert(parent_obj); - assert(parent_obj[0]->magic == ASYNC_MAGIC); - - if (NULL == aid->qhead.queue) { - func_log(__func__, "qhead->queue is NULL"); - goto done; - } - - if (ABT_mutex_lock(aid->qhead.head_mutex) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_lock\n", __func__); - return -1; - } - - // Reverse iter task list - DL_FOREACH2(aid->qhead.queue, task_elt, prev) - { - // Break out when done reverse iteration - if (is_first == 0 && task_elt == aid->qhead.queue) - break; - - if (task_elt->type != COLLECTIVE) - continue; - - // Must be same file and a dset write task - if (task_elt->async_obj->file_async_obj == parent_obj[0]->file_async_obj && - task_elt->func == async_dataset_write_fn) { - // append current write to existing multi dset write - iter_args = task_elt->args; - iter_cnt = iter_args->count; - total_cnt = iter_cnt + count; - - if (plist_id > 0) { - if (H5Pequal(iter_args->plist_id, plist_id) <= 0) { - func_log(__func__, "dxpl is not the same, cannot merge to multi-dset write"); - continue; - } - } - - // Realloc and fill the args with current write - iter_args->dset = (void **)realloc(iter_args->dset, total_cnt * sizeof(void *)); - iter_args->buf = (void **)realloc(iter_args->buf, total_cnt * sizeof(void *)); - iter_args->mem_type_id = (hid_t *)realloc(iter_args->mem_type_id, total_cnt * sizeof(hid_t)); - iter_args->mem_space_id = (hid_t *)realloc(iter_args->mem_space_id, total_cnt * sizeof(hid_t)); - iter_args->file_space_id = (hid_t *)realloc(iter_args->file_space_id, total_cnt * sizeof(hid_t)); - for (size_t i = iter_cnt; i < total_cnt; i++) { - iter_args->dset[i] = parent_obj[i - iter_cnt]->under_object; - if (mem_type_id[i - iter_cnt] > 0) - iter_args->mem_type_id[i] = H5Tcopy(mem_type_id[i - iter_cnt]); - if (mem_space_id[i - iter_cnt] > H5S_PLIST && mem_space_id[i - iter_cnt] < H5S_UNLIMITED) - iter_args->mem_space_id[i] = H5Scopy(mem_space_id[i - iter_cnt]); - else - iter_args->mem_space_id[i] = mem_space_id[i - iter_cnt]; - if (file_space_id[i - iter_cnt] > H5S_PLIST && file_space_id[i - iter_cnt] < H5S_UNLIMITED) - iter_args->file_space_id[i] = H5Scopy(file_space_id[i - iter_cnt]); - else - iter_args->file_space_id[i] = file_space_id[i - iter_cnt]; - iter_args->buf[i] = (void *)buf[i - iter_cnt]; - } - // Replace with the new req - // TODO: what to do with old req? - iter_args->req = req; - iter_args->count = total_cnt; - - task_elt->parent_objs = (struct H5VL_async_t **)realloc( - task_elt->parent_objs, total_cnt * sizeof(struct H5VL_async_t *)); - for (size_t i = iter_cnt; i < total_cnt; i++) - task_elt->parent_objs[i] = parent_obj[i - iter_cnt]; - -#ifdef ENABLE_WRITE_MEMCPY - hsize_t buf_size = 0; - for (size_t i = iter_cnt; i < total_cnt; i++) { - if (parent_obj[i]->data_size > 0 && - (iter_args->file_space_id[i] == H5S_ALL || iter_args->mem_space_id[i] == H5S_ALL)) { - buf_size = parent_obj[i]->data_size; - } - else { - buf_size = H5Tget_size(mem_type_id[i]) * H5Sget_select_npoints(mem_space_id[i]); -#ifdef ENABLE_DBG_MSG - if (buf_size == 0) - fprintf(fout_g, " [ASYNC VOL ERROR] %s with getting dataset size\n", __func__); -#endif - } - - /* fprintf(fout_g, "buf size = %llu\n", buf_size); */ - - // Get available system memory - hsize_t avail_mem = (hsize_t)get_avphys_pages() * sysconf(_SC_PAGESIZE); - - if (async_instance_g->used_mem + buf_size > async_instance_g->max_mem) { - fprintf(fout_g, - " [ ABT INFO] %d write size %lu larger than async memory limit " - "%lu, switch to " - "synchronous write\n", - async_instance_g->mpi_rank, buf_size, async_instance_g->max_mem); - } - else if (buf_size > avail_mem) { - fprintf(fout_g, - " [ ABT INFO] %d write size %lu larger than available memory %lu, " - "switch to " - "synchronous write\n", - async_instance_g->mpi_rank, buf_size, avail_mem); - } - else if (buf_size > 0) { - if (NULL == (iter_args->buf[i] = malloc(buf_size))) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s malloc failed!\n", __func__); - goto done; - } - async_instance_g->used_mem += buf_size; - iter_args->free_buf = true; - iter_args->data_size += buf_size; - - // If is contiguous space, no need to go through gather process as it can be - // costly - if (1 != is_contig_memspace(mem_space_id[i])) { - /* fprintf(fout_g," [ASYNC VOL LOG] %s will gather!\n", __func__); */ - H5Dgather(mem_space_id[i], buf[i], mem_type_id[i], buf_size, iter_args->buf[i], NULL, - NULL); - hsize_t elem_size = H5Tget_size(mem_type_id[i]); - if (elem_size == 0) - elem_size = 1; - hsize_t n_elem = (hsize_t)(buf_size / elem_size); - if (iter_args->mem_space_id[i] > 0) - H5Sclose(iter_args->mem_space_id[i]); - iter_args->mem_space_id[i] = H5Screate_simple(1, &n_elem, NULL); - } - else { - memcpy(iter_args->buf[i], buf[i], buf_size); - } - } - } -#endif - - if (NULL != task_elt->h5_state && H5VLfree_lib_state(task_elt->h5_state) < 0) - fprintf(fout_g, " [ ABT ERROR] %s H5VLfree_lib_state failed\n", __func__); - - // Retrieve current library state - if (H5VLretrieve_lib_state(&task_elt->h5_state) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5VLretrieve_lib_state failed\n", __func__); - goto done; - } - - found_task = 1; - func_log(__func__, "merged multi-dset write"); - break; - } - if (found_task) - break; - is_first = 0; - } // End task_elt - - if (ABT_mutex_unlock(aid->qhead.head_mutex) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); - return -1; - } - - func_leave(__func__); - -done: - return found_task; -} // End async_dataset_write_merge_mdset -#endif // #ifdef ENABLE_MERGE_DSET - -#else -// < 1.13.3 -static void -async_dataset_write_fn(void *foo) -{ - hbool_t acquired = false; - unsigned int mutex_count = 1; - int attempt_count = 0; - int is_lock = 0, count = 0; - hbool_t is_lib_state_restored = false; - ABT_pool *pool_ptr; - async_task_t *task = (async_task_t *)foo; - async_dataset_write_args_t *args = (async_dataset_write_args_t *)(task->args); - herr_t status; - -#ifdef ENABLE_TIMING - task->start_time = clock(); -#endif - - func_enter(__func__, NULL); - - assert(args); - assert(task); - assert(task->async_obj); - assert(task->async_obj->magic == ASYNC_MAGIC); - - pool_ptr = task->async_obj->pool_ptr; - - func_log(__func__, "trying to acquire global lock"); - - if ((attempt_count = check_app_acquire_mutex_fn(task, &mutex_count, &acquired)) < 0) - goto done; - - func_log_int1(__func__, "global lock acquired, mutex_count", mutex_count); - - /* Update the dependent parent object if it is NULL */ - if (NULL == args->dset) { - if (NULL != task->parent_obj->under_object) { - args->dset = task->parent_obj->under_object; - } - else { - if (check_parent_task(task->parent_obj) != 0) { - task->err_stack = H5Ecreate_stack(); - H5Eappend_stack(task->err_stack, task->parent_obj->create_task->err_stack, false); - H5Epush(task->err_stack, __FILE__, __func__, __LINE__, async_error_class_g, H5E_VOL, - H5E_CANTCREATE, "Parent task failed"); - -#ifdef PRINT_ERROR_STACK - H5Eprint2(task->err_stack, stderr); -#endif - fprintf(fout_g, " [ ABT ERROR] %s obj is NULL\n", __func__); - goto done; - } - } - } - - // Restore previous library state - assert(task->h5_state); - if (H5VLOPEN_LIB_CONTEXT(&task->h5_context) < 0) { - fprintf(fout_g, " [ ABT ERROR] %s %s failed\n", __func__, FUNC_H5VLOPEN_LIB_CONTEXT); - goto done; - } - if (H5VLrestore_lib_state(task->h5_state) < 0) { - fprintf(fout_g, " [ ABT ERROR] %s H5VLrestore_lib_state failed\n", __func__); - goto done; - } - is_lib_state_restored = true; - - /* Acquire async obj mutex and set the obj */ - assert(task->async_obj->obj_mutex); - assert(task->async_obj->magic == ASYNC_MAGIC); - while (1) { - if (count > 10000) { - fprintf(fout_g, " [ ABT DBG] %s cannot acquire object lock in 10s!\n", __func__); - break; - } - - if (ABT_mutex_trylock(task->async_obj->obj_mutex) == ABT_SUCCESS) { - is_lock = 1; - break; - } - else { - fprintf(fout_g, " [ ABT DBG] %s error with try_lock\n", __func__); - break; - } - usleep(1000); - count++; - } - - func_log(__func__, "execute start"); - - /* Try executing operation, without default error stack handling */ - H5E_BEGIN_TRY - { - status = H5VLdataset_write(args->dset, task->under_vol_id, args->mem_type_id, args->mem_space_id, - args->file_space_id, args->plist_id, (const void *)args->buf, NULL); - check_app_wait(attempt_count + 4, __func__); - } - H5E_END_TRY - if (status < 0) { - if ((task->err_stack = H5Eget_current_stack()) < 0) - fprintf(fout_g, " [ ABT ERROR] %s H5Eget_current_stack failed\n", __func__); - fprintf(fout_g, " [ ABT LOG] Argobots execute %s failed\n", __func__); -#ifdef PRINT_ERROR_STACK - H5Eprint2(task->err_stack, stderr); -#endif - goto done; - } - - func_log(__func__, "execute success"); - -done: - if (is_lib_state_restored && H5VLCLOSE_LIB_CONTEXT(task->h5_context) < 0) - fprintf(fout_g, " [ ABT ERROR] %s %s failed\n", __func__, FUNC_H5VLCLOSE_LIB_CONTEXT); - if (NULL != task->h5_state && H5VLfree_lib_state(task->h5_state) < 0) - fprintf(fout_g, " [ ABT ERROR] %s H5VLfree_lib_state failed\n", __func__); - task->h5_state = NULL; - - if (args->mem_type_id > 0) - H5Tclose(args->mem_type_id); - if (args->mem_space_id > H5S_PLIST && args->mem_space_id < H5S_UNLIMITED) - H5Sclose(args->mem_space_id); - if (args->file_space_id > H5S_PLIST && args->file_space_id < H5S_UNLIMITED) - H5Sclose(args->file_space_id); - if (args->plist_id > 0) - H5Pclose(args->plist_id); - - if (is_lock == 1) { - if (ABT_mutex_unlock(task->async_obj->obj_mutex) != ABT_SUCCESS) - fprintf(fout_g, " [ ABT ERROR] %s ABT_mutex_unlock failed\n", __func__); - } - - ABT_eventual_set(task->eventual, NULL, 0); - task->in_abt_pool = 0; - task->is_done = 1; - - /* remove_task_from_queue(task, __func__); */ - - func_log(__func__, "release global lock"); - - if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ ABT ERROR] %s H5TSmutex_release failed\n", __func__); - } - if (async_instance_g && NULL != async_instance_g->qhead.queue && async_instance_g->start_abt_push) - push_task_to_abt_pool(&async_instance_g->qhead, *pool_ptr, __func__); - -#ifdef ENABLE_WRITE_MEMCPY - if (args->free_buf && args->buf) { - free(args->buf); - async_instance_g->used_mem -= args->data_size; - func_log(__func__, "released dset memcpy") - } -#endif - free(args); - task->args = NULL; - -#ifdef ENABLE_TIMING - task->end_time = clock(); -#endif - func_leave(__func__); - - return; -} // End async_dataset_write_fn < 1.13.3 - -static herr_t -async_dataset_write(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_type_id, hid_t mem_space_id, - hid_t file_space_id, hid_t plist_id, const void *buf, void **req) -{ - async_task_t *async_task = NULL; - async_dataset_write_args_t *args = NULL; - bool lock_parent = false; - bool is_blocking = false; - hbool_t acquired = false; - unsigned int mutex_count = 1; - - func_enter(__func__, NULL); - - assert(aid); - assert(parent_obj); - assert(parent_obj->magic == ASYNC_MAGIC); - - async_instance_g->prev_push_state = async_instance_g->start_abt_push; - - if ((args = (async_dataset_write_args_t *)calloc(1, sizeof(async_dataset_write_args_t))) == NULL) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with calloc\n", __func__); - goto error; - } - /* create a new task and insert into its file task list */ - if ((async_task = create_async_task()) == NULL) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with calloc\n", __func__); - goto error; - } + // Realloc and fill the args with current write + iter_args->dset = (void **)realloc(iter_args->dset, total_cnt * sizeof(void *)); + iter_args->buf = (void **)realloc(iter_args->buf, total_cnt * sizeof(void *)); + iter_args->mem_type_id = (hid_t *)realloc(iter_args->mem_type_id, total_cnt * sizeof(hid_t)); + iter_args->mem_space_id = (hid_t *)realloc(iter_args->mem_space_id, total_cnt * sizeof(hid_t)); + iter_args->file_space_id = (hid_t *)realloc(iter_args->file_space_id, total_cnt * sizeof(hid_t)); + for (size_t i = iter_cnt; i < total_cnt; i++) { + iter_args->dset[i] = parent_obj[i - iter_cnt]->under_object; + if (mem_type_id[i - iter_cnt] > 0) + iter_args->mem_type_id[i] = H5Tcopy(mem_type_id[i - iter_cnt]); + if (mem_space_id[i - iter_cnt] > H5S_PLIST && mem_space_id[i - iter_cnt] < H5S_UNLIMITED) + iter_args->mem_space_id[i] = H5Scopy(mem_space_id[i - iter_cnt]); + else + iter_args->mem_space_id[i] = mem_space_id[i - iter_cnt]; + if (file_space_id[i - iter_cnt] > H5S_PLIST && file_space_id[i - iter_cnt] < H5S_UNLIMITED) + iter_args->file_space_id[i] = H5Scopy(file_space_id[i - iter_cnt]); + else + iter_args->file_space_id[i] = file_space_id[i - iter_cnt]; + iter_args->buf[i] = (void *)buf[i - iter_cnt]; + } + // Replace with the new req + // TODO: what to do with old req? + iter_args->req = req; + iter_args->count = total_cnt; -#ifdef ENABLE_TIMING - async_task->create_time = clock(); -#endif - args->dset = parent_obj->under_object; - if (mem_type_id > 0) - args->mem_type_id = H5Tcopy(mem_type_id); - if (mem_space_id > H5S_PLIST && mem_space_id < H5S_UNLIMITED) - args->mem_space_id = H5Scopy(mem_space_id); - else - args->mem_space_id = mem_space_id; - if (file_space_id > H5S_PLIST && file_space_id < H5S_UNLIMITED) - args->file_space_id = H5Scopy(file_space_id); - else - args->file_space_id = file_space_id; - if (plist_id > 0) - args->plist_id = H5Pcopy(plist_id); - args->buf = (void *)buf; - args->req = req; + task_elt->parent_objs = (struct H5VL_async_t **)realloc( + task_elt->parent_objs, total_cnt * sizeof(struct H5VL_async_t *)); + for (size_t i = iter_cnt; i < total_cnt; i++) + task_elt->parent_objs[i] = parent_obj[i - iter_cnt]; #ifdef ENABLE_WRITE_MEMCPY - hsize_t buf_size = 0; - if (parent_obj->data_size > 0 && args->file_space_id == H5S_ALL) { - buf_size = parent_obj->data_size; - } - else { - buf_size = H5Tget_size(mem_type_id) * H5Sget_select_npoints(mem_space_id); + hsize_t buf_size = 0; + for (size_t i = iter_cnt; i < total_cnt; i++) { + if (parent_obj[i]->data_size > 0 && + (iter_args->file_space_id[i] == H5S_ALL || iter_args->mem_space_id[i] == H5S_ALL)) { + buf_size = parent_obj[i]->data_size; + } + else { + buf_size = H5Tget_size(mem_type_id[i]) * H5Sget_select_npoints(mem_space_id[i]); #ifdef ENABLE_DBG_MSG - if (buf_size == 0) - fprintf(fout_g, " [ASYNC VOL ERROR] %s with getting dataset size\n", __func__); + if (buf_size == 0) + fprintf(fout_g, " [ASYNC VOL ERROR] %s with getting dataset size\n", __func__); #endif - } - - /* fprintf(fout_g, "buf size = %llu\n", buf_size); */ + } - // Get available system memory - hsize_t avail_mem = (hsize_t)get_avphys_pages() * sysconf(_SC_PAGESIZE); + /* fprintf(fout_g, "buf size = %llu\n", buf_size); */ - if (async_instance_g->used_mem + buf_size > async_instance_g->max_mem) { - is_blocking = true; - args->buf = (void *)buf; - fprintf(fout_g, - " [ ABT INFO] %d write size %lu larger than async memory limit %lu, switch to " - "synchronous write\n", - async_instance_g->mpi_rank, buf_size, async_instance_g->max_mem); - } - else if (buf_size > avail_mem) { - is_blocking = true; - args->buf = (void *)buf; - fprintf(fout_g, - " [ ABT INFO] %d write size %lu larger than available memory %lu, switch to " - "synchronous write\n", - async_instance_g->mpi_rank, buf_size, avail_mem); - } - else if (buf_size > 0) { - if (NULL == (args->buf = malloc(buf_size))) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s malloc failed!\n", __func__); - goto done; - } - async_instance_g->used_mem += buf_size; - args->free_buf = true; - args->data_size = buf_size; - - // If is contiguous space, no need to go through gather process as it can be costly - if (1 != is_contig_memspace(mem_space_id)) { - /* fprintf(fout_g," [ASYNC VOL LOG] %s will gather!\n", __func__); */ - H5Dgather(mem_space_id, buf, mem_type_id, buf_size, args->buf, NULL, NULL); - hsize_t elem_size = H5Tget_size(mem_type_id); - if (elem_size == 0) - elem_size = 1; - hsize_t n_elem = (hsize_t)(buf_size / elem_size); - if (args->mem_space_id > 0) - H5Sclose(args->mem_space_id); - args->mem_space_id = H5Screate_simple(1, &n_elem, NULL); - } - else { - memcpy(args->buf, buf, buf_size); - } - } -#endif + // Get available system memory + hsize_t avail_mem = (hsize_t)get_avphys_pages() * sysconf(_SC_PAGESIZE); - if (req) { - H5VL_async_t *new_req; - if ((new_req = H5VL_async_new_obj(NULL, parent_obj->under_vol_id)) == NULL) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with request object calloc\n", __func__); - goto error; - } - new_req->my_task = async_task; - /* new_req->under_object = new_req; */ - new_req->file_async_obj = parent_obj->file_async_obj; - *req = (void *)new_req; - } - else { - is_blocking = true; - async_instance_g->start_abt_push = true; - } + if (async_instance_g->used_mem + buf_size > async_instance_g->max_mem) { + fprintf(fout_g, + " [ ABT INFO] %d write size %lu larger than async memory limit " + "%lu, switch to " + "synchronous write\n", + async_instance_g->mpi_rank, buf_size, async_instance_g->max_mem); + } + else if (buf_size > avail_mem) { + fprintf(fout_g, + " [ ABT INFO] %d write size %lu larger than available memory %lu, " + "switch to " + "synchronous write\n", + async_instance_g->mpi_rank, buf_size, avail_mem); + } + else if (buf_size > 0) { + if (NULL == (iter_args->buf[i] = malloc(buf_size))) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s malloc failed!\n", __func__); + goto done; + } + async_instance_g->used_mem += buf_size; + iter_args->free_buf = true; + iter_args->data_size += buf_size; - // Retrieve current library state - if (H5VLretrieve_lib_state(&async_task->h5_state) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5VLretrieve_lib_state failed\n", __func__); - goto done; - } + // If is contiguous space, no need to go through gather process as it can be + // costly + if (1 != is_contig_memspace(mem_space_id[i])) { + /* fprintf(fout_g," [ASYNC VOL LOG] %s will gather!\n", __func__); */ + H5Dgather(mem_space_id[i], buf[i], mem_type_id[i], buf_size, iter_args->buf[i], NULL, + NULL); + hsize_t elem_size = H5Tget_size(mem_type_id[i]); + if (elem_size == 0) + elem_size = 1; + hsize_t n_elem = (hsize_t)(buf_size / elem_size); + if (iter_args->mem_space_id[i] > 0) + H5Sclose(iter_args->mem_space_id[i]); + iter_args->mem_space_id[i] = H5Screate_simple(1, &n_elem, NULL); + } + else { + memcpy(iter_args->buf[i], buf[i], buf_size); + } + } + } +#endif - async_task->func = async_dataset_write_fn; - async_task->args = args; - async_task->op = WRITE; - async_task->under_vol_id = parent_obj->under_vol_id; - async_task->async_obj = parent_obj; - async_task->parent_obj = parent_obj; + if (NULL != task_elt->h5_state && H5VLfree_lib_state(task_elt->h5_state) < 0) + fprintf(fout_g, " [ ABT ERROR] %s H5VLfree_lib_state failed\n", __func__); - /* Lock parent_obj */ - while (1) { - if (parent_obj->obj_mutex && ABT_mutex_trylock(parent_obj->obj_mutex) == ABT_SUCCESS) { - lock_parent = true; - break; - } - // Temp release global lock in case background is waiting - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - usleep(1000); - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto error; + // Retrieve current library state + if (H5VLretrieve_lib_state(&task_elt->h5_state) < 0) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s H5VLretrieve_lib_state failed\n", __func__); + goto done; } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - } - parent_obj->task_cnt++; - parent_obj->pool_ptr = &aid->pool; - /* Check if its parent has valid object */ - if (NULL == parent_obj->under_object) { - if (NULL != parent_obj->create_task) { - add_task_to_queue(&aid->qhead, async_task, DEPENDENT); - } - else { - fprintf(fout_g, " [ASYNC VOL ERROR] %s parent task not created\n", __func__); - goto error; + found_task = 1; + func_log(__func__, "merged multi-dset write"); + break; } - } - else { -#ifdef MPI_VERSION - H5FD_mpio_xfer_t xfer_mode; - H5Pget_dxpl_mpio(plist_id, &xfer_mode); - if (xfer_mode == H5FD_MPIO_COLLECTIVE) - add_task_to_queue(&aid->qhead, async_task, COLLECTIVE); - else -#endif - add_task_to_queue(&aid->qhead, async_task, REGULAR); - } + if (found_task) + break; + is_first = 0; + } // End task_elt - if (ABT_mutex_unlock(parent_obj->obj_mutex) != ABT_SUCCESS) { + if (ABT_mutex_unlock(aid->qhead.head_mutex) != ABT_SUCCESS) { fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); - goto error; - } - lock_parent = false; - if (aid->ex_delay == false && !async_instance_g->pause) { - if (get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - } - - if (is_blocking) { - if (block_and_wait_task(async_task, __func__) < 0) - goto error; + return -1; } - // Restore async operation state - async_instance_g->start_abt_push = async_instance_g->prev_push_state; - func_leave(__func__); done: - return 0; -error: - fprintf(fout_g, " [ASYNC VOL ERROR] %s\n", __func__); - // Restore async operation state - async_instance_g->start_abt_push = async_instance_g->prev_push_state; - - if (lock_parent) { - if (ABT_mutex_unlock(parent_obj->obj_mutex) != ABT_SUCCESS) - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); - } - if (async_task && async_task->args) { - free(async_task->args); - if (async_task) - async_task->args = NULL; - } - return -1; -} // End async_dataset_write < 1.13.3 -#endif + return found_task; +} // End async_dataset_write_merge_mdset +#endif // #ifdef ENABLE_MERGE_DSET static void async_dataset_get_fn(void *foo) @@ -21278,7 +20551,6 @@ H5VL_async_dataset_open(void *obj, const H5VL_loc_params_t *loc_params, const ch *------------------------------------------------------------------------- */ static herr_t -#if H5_VERSION_GE(1, 13, 3) H5VL_async_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t mem_space_id[], hid_t file_space_id[], hid_t plist_id, void *buf[], void **req) { @@ -21347,38 +20619,7 @@ H5VL_async_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t m } return ret_value; -} /* end H5VL_async_dataset_read() > 1.13.3 */ -#else -H5VL_async_dataset_read(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, - hid_t plist_id, void *buf, void **req) -{ - H5VL_async_t *o = (H5VL_async_t *)dset; - herr_t ret_value; - -#ifdef ENABLE_ASYNC_LOGGING - printf("------- ASYNC VOL DATASET Read\n"); -#endif - - H5VL_async_dxpl_set_disable_implicit(plist_id); - H5VL_async_dxpl_set_pause(plist_id); - - if (H5VL_async_is_implicit_disabled(DSET_RW_OP, __func__)) { - ret_value = H5VLdataset_read(o->under_object, o->under_vol_id, mem_type_id, mem_space_id, - file_space_id, plist_id, buf, req); - /* Check for async request */ - if (req && *req) - *req = H5VL_async_new_obj(*req, o->under_vol_id); - } - else { - if ((ret_value = async_dataset_read(async_instance_g, o, mem_type_id, mem_space_id, file_space_id, - plist_id, buf, req)) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] with async_dataset_read\n"); - } - } - - return ret_value; -} /* end H5VL_async_dataset_read() < 1.13.3 */ -#endif +} /* end H5VL_async_dataset_read() */ /*------------------------------------------------------------------------- * Function: H5VL_async_dataset_write @@ -21390,7 +20631,6 @@ H5VL_async_dataset_read(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t * *------------------------------------------------------------------------- */ -#if H5_VERSION_GE(1, 13, 3) static herr_t H5VL_async_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t mem_space_id[], hid_t file_space_id[], hid_t plist_id, const void *buf[], void **req) @@ -21461,40 +20701,7 @@ H5VL_async_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t } return ret_value; -} /* end H5VL_async_dataset_write() > 1.13.3 */ -#else -static herr_t -H5VL_async_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, - hid_t plist_id, const void *buf, void **req) -{ - H5VL_async_t *o = (H5VL_async_t *)dset; - herr_t ret_value; - -#ifdef ENABLE_ASYNC_LOGGING - printf("------- ASYNC VOL DATASET Write\n"); -#endif - - H5VL_async_dxpl_set_disable_implicit(plist_id); - H5VL_async_dxpl_set_pause(plist_id); - - if (H5VL_async_is_implicit_disabled(DSET_RW_OP, __func__)) { - ret_value = H5VLdataset_write(o->under_object, o->under_vol_id, mem_type_id, mem_space_id, - file_space_id, plist_id, buf, req); - - /* Check for async request */ - if (req && *req) - *req = H5VL_async_new_obj(*req, o->under_vol_id); - } - else { - if ((ret_value = async_dataset_write(async_instance_g, o, mem_type_id, mem_space_id, file_space_id, - plist_id, buf, req)) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] with async_dataset_write\n"); - } - } - - return ret_value; -} /* end H5VL_async_dataset_write() < 1.13.3*/ -#endif +} /* end H5VL_async_dataset_write() */ /*------------------------------------------------------------------------- * Function: H5VL_async_dataset_get @@ -22034,11 +21241,7 @@ H5VL_async_file_create(const char *name, unsigned flags, hid_t fcpl_id, hid_t fa */ #ifdef MPI_VERSION { -#if H5_VERSION_GE(1, 13, 3) uint64_t cap_flags = 0; -#else - unsigned cap_flags = 0; -#endif /* Query the capability flags for the underlying VOL connector */ if (H5VLintrospect_get_cap_flags(info->under_vol_info, info->under_vol_id, &cap_flags) < 0) @@ -22137,11 +21340,7 @@ H5VL_async_file_open(const char *name, unsigned flags, hid_t fapl_id, hid_t dxpl */ #ifdef MPI_VERSION { -#if H5_VERSION_GE(1, 13, 3) uint64_t cap_flags = 0; -#else - unsigned cap_flags = 0; -#endif /* Query the capability flags for the underlying VOL connector */ if (H5VLintrospect_get_cap_flags(info->under_vol_info, info->under_vol_id, &cap_flags) < 0) @@ -23344,11 +22543,7 @@ H5VL_async_introspect_get_conn_cls(void *obj, H5VL_get_conn_lvl_t lvl, const H5V *------------------------------------------------------------------------- */ herr_t -#if H5_VERSION_GE(1, 13, 3) H5VL_async_introspect_get_cap_flags(const void *_info, uint64_t *cap_flags) -#else -H5VL_async_introspect_get_cap_flags(const void *_info, unsigned *cap_flags) -#endif { const H5VL_async_info_t *info = (const H5VL_async_info_t *)_info; herr_t ret_value; diff --git a/test/async_test_serial_mdset.c b/test/async_test_serial_mdset.c index 64a77e8..812c574 100644 --- a/test/async_test_serial_mdset.c +++ b/test/async_test_serial_mdset.c @@ -86,7 +86,6 @@ main(int argc, char *argv[]) if (print_dbg_msg) fprintf(stderr, "H5Dcreate 1 done\n"); -#if H5_VERSION_GE(1, 13, 3) if (print_dbg_msg) fprintf(stderr, "H5Dwrite multi start\n"); status = H5Dwrite_multi_async(2, dset_ids, mem_type_ids, mspace_id, fspace_id, H5P_DEFAULT, @@ -133,7 +132,6 @@ main(int argc, char *argv[]) if (ret != -1) fprintf(stderr, "Finished verification\n"); -#endif H5ESclose(es_id); H5Sclose(fspace_id[0]);