Skip to content

Commit

Permalink
Merge pull request #2238 from GEOS-ESM/bugfix/wjiang/writing_same_file
Browse files Browse the repository at this point in the history
Fix a bug so that MultigroupServer does not allow a file written by m…
  • Loading branch information
mathomp4 committed Jul 18, 2023
2 parents 46a6dd2 + 54391aa commit e05c1f8
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Deprecated

## [2.39.7] - 2023-07-18

### Fixed

- Fix a bug so that MultigroupServer does not allow a file written by multiple processes at the same time.

## [2.39.6] - 2023-07-18

### Changed
Expand All @@ -30,6 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [2.39.5] - 2023-07-10

### Fixed

- Fixed logic in generating the names of the split fields. If the alias field in the History.rc has separators (;), each substring is used to name the resulting fields. If there are no separators, this will be the exact name of the first split field

## [2.39.4] - 2023-06-23
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ endif ()

project (
MAPL
VERSION 2.39.6
VERSION 2.39.7
LANGUAGES Fortran CXX C) # Note - CXX is required for ESMF

# Set the possible values of build type for cmake-gui
Expand Down
2 changes: 1 addition & 1 deletion pfio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ if (BUILD_WITH_PFLOGGER)
endif ()

esma_add_library (${this} SRCS ${srcs} DEPENDENCIES MAPL.shared MAPL.profiler NetCDF::NetCDF_Fortran NetCDF::NetCDF_C TYPE ${MAPL_LIBRARY_TYPE})
target_link_libraries (${this} PUBLIC GFTL_SHARED::gftl-shared PFLOGGER::pflogger PRIVATE MPI::MPI_Fortran)
target_link_libraries (${this} PUBLIC GFTL::gftl-v2 GFTL_SHARED::gftl-shared-v2 PFLOGGER::pflogger PRIVATE MPI::MPI_Fortran)
# CMake has an OpenMP issue with NAG Fortran: https://gitlab.kitware.com/cmake/cmake/-/issues/21280
if (NOT CMAKE_Fortran_COMPILER_ID MATCHES "NAG")
target_link_libraries(${this} PRIVATE OpenMP::OpenMP_Fortran)
Expand Down
48 changes: 45 additions & 3 deletions pfio/MultiGroupServer.F90
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ module pFIO_MultiGroupServerMod
use pFIO_AbstractRequestHandleMod
use pFIO_FileMetadataMod
use pFIO_IntegerMessageMapMod
use gFTL2_StringSet, StringSetIterator =>SetIterator
use mpi
use pFlogger, only: logging, Logger

Expand Down Expand Up @@ -87,6 +88,8 @@ module pFIO_MultiGroupServerMod
module procedure new_MultiGroupServer
end interface MultiGroupServer

integer, parameter :: FNAME_LEN = 512

contains

function new_MultiGroupServer(server_comm, port_name, nwriter_per_node, with_profiler, rc) result(s)
Expand Down Expand Up @@ -335,6 +338,7 @@ subroutine receive_output_data(this, rc)
type (HistoryCollection), pointer :: hist_collection
integer, pointer :: i_ptr(:)
class (AbstractRequestHandle), pointer :: handle
character(len=FNAME_LEN) :: FileName

if (associated(ioserver_profiler)) call ioserver_profiler%start("receive_data")
client_num = this%threads%size()
Expand Down Expand Up @@ -395,6 +399,15 @@ subroutine receive_output_data(this, rc)
if (this%I_am_front_root) then
collection_id = collection_ids%at(collection_counter)
call Mpi_Send(collection_id, 1, MPI_INTEGER, this%back_ranks(1), this%back_ranks(1), this%server_comm, ierror)
msg =>f_d_ms(collection_counter)%msg_vec%at(1) ! just pick first one. All messages should have the same filename
select type (q=>msg)
class is (AbstractCollectiveDataMessage)
Filename = q%file_name
call Mpi_Send(FileName, FNAME_LEN, MPI_CHARACTER, this%back_ranks(1), this%back_ranks(1), this%server_comm, ierror)
class default
_FAIL( "yet to implemented")
end select

! here thread_ptr can point to any thread
hist_collection => thread_ptr%hist_collections%at(collection_id)
call hist_collection%fmd%serialize(buffer)
Expand Down Expand Up @@ -438,6 +451,7 @@ subroutine start_back(this, rc)
integer, parameter :: stag = 6782

integer :: status
type (StringSet) :: FilesBeingWritten

allocate(this%serverthread_done_msgs(1))
this%serverthread_done_msgs(:) = .false.
Expand All @@ -462,6 +476,7 @@ subroutine start_back_captain(rc)
integer :: i, no_job, local_rank, node_rank, nth_writer
integer :: terminate, idle_writer, ierr
integer :: MPI_STAT(MPI_STATUS_SIZE)
character(len=FNAME_LEN) :: FileName

nwriter_per_node = this%nwriter/this%Node_Num
allocate(num_idlePEs(0:this%Node_Num-1))
Expand All @@ -482,8 +497,12 @@ subroutine start_back_captain(rc)
this%front_ranks(1), this%back_ranks(1), this%server_comm, &
MPI_STAT, ierr)
if (collection_id == -1) exit

call MPI_recv( FileName, FNAME_LEN , MPI_CHARACTER, &
this%front_ranks(1), this%back_ranks(1), this%server_comm, &
MPI_STAT, ierr)
! 2) get an idle processor and notify front root
call dispatch_work(collection_id, idleRank, num_idlePEs, rc=status)
call dispatch_work(collection_id, idleRank, num_idlePEs, FileName, rc=status)
_VERIFY(status)
enddo ! while .true.

Expand All @@ -498,16 +517,19 @@ subroutine start_back_captain(rc)
_RETURN(_SUCCESS)
end subroutine start_back_captain

subroutine dispatch_work(collection_id, idleRank, num_idlePEs, rc)
subroutine dispatch_work(collection_id, idleRank, num_idlePEs, FileName, rc)
integer, intent(in) :: collection_id
integer, intent(inout) :: idleRank(0:,0:)
integer, intent(inout) :: num_idlePEs(0:)
character(*), intent(in) :: FileName
integer, optional, intent(out) :: rc

integer :: MPI_STAT(MPI_STATUS_SIZE)
integer :: local_rank, idle_writer, nth_writer, node_rank
integer :: i, ierr, nwriter_per_node
logical :: flag
character(len=FNAME_LEN) :: FileDone
type (StringSetIterator) :: iter

! 2.1) try to retrieve idle writers
! keep looping (waiting) until there are idle processors
Expand All @@ -526,10 +548,21 @@ subroutine dispatch_work(collection_id, idleRank, num_idlePEs, rc)
num_idlePEs(node_rank) = num_idlePEs(node_rank) + 1
nth_writer = mod(local_rank, nwriter_per_node)
idleRank(node_rank, nth_writer) = local_rank

call MPI_recv(FileDone, FNAME_LEN, MPI_CHARACTER, &
local_rank, stag+1, this%back_comm, &
MPI_STAT, ierr)

iter = FilesBeingWritten%find(FileDone)
_ASSERT( iter /= FilesBeingWritten%end(), "FileDone should be in the set")
iter = FilesBeingWritten%erase(iter)
endif
enddo
! if there is no idle processor, get back to probe
if (all(num_idlePEs == 0)) cycle
! if this file is still being written, get back to probe
iter = FilesBeingWritten%find(FileName)
if (iter /= FilesBeingWritten%end()) cycle

! get the node with the most idle processors
node_rank = maxloc(num_idlePEs, dim=1) - 1
Expand All @@ -541,7 +574,8 @@ subroutine dispatch_work(collection_id, idleRank, num_idlePEs, rc)
exit
enddo
_ASSERT(1<= idle_writer .and. idle_writer <= this%nwriter-1, "wrong local rank of writer")
exit ! exit while loop after get one idle processor
call FilesBeingWritten%insert(FileName)
exit ! exit the loop after get one idle processor and the file is done
enddo ! while, get one idle writer

! 2.2) tell front comm which idel_worker is ready
Expand All @@ -559,6 +593,7 @@ subroutine terminate_back_writers(idleRank, rc)
integer :: MPI_STAT(MPI_STATUS_SIZE)
integer :: node_rank, local_rank, nth_writer
integer :: ierr, no_job, nwriter_per_node, idle_writer
character(len=FNAME_LEN) :: FileDone

no_job = -1
nwriter_per_node = size(idleRank, 2)
Expand All @@ -574,6 +609,9 @@ subroutine terminate_back_writers(idleRank, rc)
call MPI_recv( idle_writer, 1, MPI_INTEGER, &
local_rank, stag, this%back_comm, &
MPI_STAT, ierr)
call MPI_recv( FileDone, FNAME_LEN, MPI_CHARACTER, &
local_rank, stag+1, this%back_comm, &
MPI_STAT, ierr)
_ASSERT(local_rank == idle_writer, "local_rank and idle_writer should match")
call MPI_send(no_job, 1, MPI_INTEGER, local_rank, local_rank, this%back_comm, ierr)
endif
Expand Down Expand Up @@ -612,6 +650,7 @@ subroutine start_back_writers(rc)
type(AdvancedMeter) :: file_timer
real(kind=REAL64) :: time
character(len=:), allocatable :: filename
character(len=FNAME_LEN) :: FileDone
real(kind=REAL64) :: file_size, speed

class(Logger), pointer :: lgr
Expand Down Expand Up @@ -828,7 +867,10 @@ subroutine start_back_writers(rc)
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
! telling captain it is idle by sending its own rank
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

call MPI_send(back_local_rank, 1, MPI_INTEGER, 0, stag, this%back_comm , ierr)
FileDone = Filename
call MPI_send(FileDone, FNAME_LEN, MPI_CHARACTER, 0, stag+1, this%back_comm , ierr)
enddo
_RETURN(_SUCCESS)
end subroutine start_back_writers
Expand Down

0 comments on commit e05c1f8

Please sign in to comment.