MultiCommServer.F90 Source File


This file depends on

sourcefile~~multicommserver.f90~~EfferentGraph sourcefile~multicommserver.f90 MultiCommServer.F90 sourcefile~abstractdatareference.f90 AbstractDataReference.F90 sourcefile~multicommserver.f90->sourcefile~abstractdatareference.f90 sourcefile~abstractdirectoryservice.f90 AbstractDirectoryService.F90 sourcefile~multicommserver.f90->sourcefile~abstractdirectoryservice.f90 sourcefile~abstractmessage.f90 AbstractMessage.F90 sourcefile~multicommserver.f90->sourcefile~abstractmessage.f90 sourcefile~abstractserver.f90 AbstractServer.F90 sourcefile~multicommserver.f90->sourcefile~abstractserver.f90 sourcefile~abstractsocket.f90 AbstractSocket.F90 sourcefile~multicommserver.f90->sourcefile~abstractsocket.f90 sourcefile~abstractsocketvector.f90 AbstractSocketVector.F90 sourcefile~multicommserver.f90->sourcefile~abstractsocketvector.f90 sourcefile~attribute.f90 Attribute.F90 sourcefile~multicommserver.f90->sourcefile~attribute.f90 sourcefile~baseserver.f90 BaseServer.F90 sourcefile~multicommserver.f90->sourcefile~baseserver.f90 sourcefile~collectivestagedatamessage.f90 CollectiveStageDataMessage.F90 sourcefile~multicommserver.f90->sourcefile~collectivestagedatamessage.f90 sourcefile~forwarddatamessage.f90 ForwardDataMessage.F90 sourcefile~multicommserver.f90->sourcefile~forwarddatamessage.f90 sourcefile~historycollection.f90 HistoryCollection.F90 sourcefile~multicommserver.f90->sourcefile~historycollection.f90 sourcefile~localmemreference.f90 LocalMemReference.F90 sourcefile~multicommserver.f90->sourcefile~localmemreference.f90 sourcefile~mapl_errorhandling.f90 MAPL_ErrorHandling.F90 sourcefile~multicommserver.f90->sourcefile~mapl_errorhandling.f90 sourcefile~mapl_keywordenforcer.f90 MAPL_KeywordEnforcer.F90 sourcefile~multicommserver.f90->sourcefile~mapl_keywordenforcer.f90 sourcefile~messagevector.f90 MessageVector.F90 sourcefile~multicommserver.f90->sourcefile~messagevector.f90 sourcefile~mpisocket.f90 MpiSocket.F90 sourcefile~multicommserver.f90->sourcefile~mpisocket.f90 sourcefile~netcdf4_fileformatter.f90 NetCDF4_FileFormatter.F90 sourcefile~multicommserver.f90->sourcefile~netcdf4_fileformatter.f90 sourcefile~pfio_constants.f90 pFIO_Constants.F90 sourcefile~multicommserver.f90->sourcefile~pfio_constants.f90 sourcefile~pfio_utilities.f90 pFIO_Utilities.F90 sourcefile~multicommserver.f90->sourcefile~pfio_utilities.f90 sourcefile~rdmareference.f90 RDMAReference.F90 sourcefile~multicommserver.f90->sourcefile~rdmareference.f90 sourcefile~serverthread.f90 ServerThread.F90 sourcefile~multicommserver.f90->sourcefile~serverthread.f90 sourcefile~serverthreadvector.f90 ServerThreadVector.F90 sourcefile~multicommserver.f90->sourcefile~serverthreadvector.f90 sourcefile~shmemreference.f90 ShmemReference.F90 sourcefile~multicommserver.f90->sourcefile~shmemreference.f90 sourcefile~simplecommsplitter.f90 SimpleCommSplitter.F90 sourcefile~multicommserver.f90->sourcefile~simplecommsplitter.f90 sourcefile~splitcommunicator.f90 SplitCommunicator.F90 sourcefile~multicommserver.f90->sourcefile~splitcommunicator.f90

Files dependent on this one

sourcefile~~multicommserver.f90~~AfferentGraph sourcefile~multicommserver.f90 MultiCommServer.F90 sourcefile~pfio.f90 pFIO.F90 sourcefile~pfio.f90->sourcefile~multicommserver.f90

Source Code

#include "MAPL_ErrLog.h"
#include "unused_dummy.H"

module pFIO_MultiCommServerMod
   use, intrinsic :: iso_c_binding, only: c_ptr
   use, intrinsic :: iso_c_binding, only: c_loc
   use, intrinsic :: iso_fortran_env, only: INT64
   use, intrinsic :: iso_c_binding, only: c_f_pointer
   use mapl_KeywordEnforcerMod
   use MAPL_ErrorHandlingMod
   use pFIO_ConstantsMod
   use pFIO_CollectiveStageDataMessageMod
   use pFIO_UtilitiesMod
   use pFIO_AbstractDirectoryServiceMod
   use pFIO_ServerThreadMod
   use pFIO_ServerThreadVectorMod
   use pFIO_AbstractSocketMod
   use pFIO_AbstractSocketVectorMod
   use pFIO_AbstractServerMod
   use gFTL_StringInteger64Map
   use pFIO_AbstractMessageMod
   use pFIO_AbstractDataReferenceMod
   use pFIO_AbstractDataReferenceVectorMod
   use pFIO_ShmemReferenceMod
   use pFIO_RDMAReferenceMod
   use pFIO_LocalMemReferenceMod
   use pFIO_MessageVectorMod
   use pFIO_MessageVectorUtilMod
   use pFIO_ForwardDataMessageMod
   use pFIO_NetCDF4_FileFormatterMod
   use pFIO_HistoryCollectionMod
   use pFIO_HistoryCollectionVectorMod
   use pFIO_HistoryCollectionVectorUtilMod
   use pFIO_BaseServerMod
   use pFIO_AttributeMod
   use pFIO_StringAttributeMapMod
   use pFIO_StringAttributeMapUtilMod
   use MAPL_SplitCommunicatorMod
   use MAPL_SimpleCommSplitterMod
   use pFIO_MpiSocketMod
   use mpi

   implicit none
   private

   public :: MultiCommServer

   type,extends (BaseServer) :: MultiCommServer
      character(len=:), allocatable :: port_name
      integer :: front_Comm
      integer :: back_Comm
      integer :: server_Comm
      integer :: nwriter ! nback or number of back end
      integer :: nfront  ! number of front end
      logical :: I_am_front_root
      logical :: I_am_back_root
      integer, allocatable :: back_ranks(:)
      integer, allocatable :: front_ranks(:)
      type(AbstractDataReferenceVector) :: MemdataRefPtrs
      type (SimpleCommSplitter) :: splitter
   contains
      procedure :: start
      procedure :: get_writing_PE
      procedure :: get_communicator
      procedure :: receive_output_data
      procedure :: create_remote_win
      procedure :: put_dataToFile
      procedure :: clean_up
   end type MultiCommServer

   interface MultiCommServer
      module procedure new_MultiCommServer
   end interface MultiCommServer

contains

   function new_MultiCommServer(server_comm, port_name, nwriter_per_node, rc) result(s)
      type (MultiCommServer) :: s
      integer, intent(in) :: server_comm
      character(*), intent(in) :: port_name
      integer, intent (in) :: nwriter_per_node
      integer, optional, intent(out) :: rc
      integer :: s_rank, s_size, nwriter
      integer :: ierror, status, local_rank
      type (SplitCommunicator)   ::  s_comm
      character(len=:), allocatable :: s_name
      integer :: MPI_STAT(MPI_STATUS_SIZE)
      type (MpiSocket), target :: dummy_socket
      integer, allocatable :: node_sizes(:)

      s%server_comm = server_comm
      call MPI_Comm_size(s%server_comm, s_size , ierror)
      _VERIFY(ierror)

      s%splitter = SimpleCommsplitter(s%server_comm)
      node_sizes = s%splitter%get_node_sizes()
      call s%splitter%add_group(npes_per_node = node_sizes(1)-nwriter_per_node, name="o_server_front", isolate_nodes=.false.)
      call s%splitter%add_group(npes_per_node = nwriter_per_node,               name="o_server_back",  isolate_nodes=.false.)

      s_comm = s%splitter%split(rc=status); _VERIFY(status)

      nwriter = nwriter_per_node*size(node_sizes)
      s%front_comm = MPI_COMM_NULL
      s%back_comm  = MPI_COMM_NULL
      s%nwriter    = nwriter
      s%nfront     = s_size - nwriter
      s%threads = ServerThreadVector()

      allocate(s%back_ranks(nwriter))
      allocate(s%front_ranks(s%nfront))
      call MPI_Comm_rank(s%server_comm, s_rank, ierror)
      _VERIFY(ierror)
      s_name = s_comm%get_name()
      s%I_am_front_root = .false.
      s%I_am_back_root = .false.
      if (index(s_name, 'o_server_front') /=0) then
         s%front_comm = s_comm%get_subcommunicator()
         call s%init(s%front_comm, s_name)
         s%port_name = trim(port_name)
         call MPI_Comm_rank(s%front_comm, local_rank, ierror)
         _VERIFY(ierror)
         if (s_rank == 0) then
           _ASSERT( local_rank == 0, "re-arrange the rank of the server_comm")
           s%I_am_front_root = .true.
           call MPI_recv(s%back_ranks, nwriter, MPI_INTEGER, MPI_ANY_SOURCE, 666, s%server_comm, MPI_STAT,ierror)
           _VERIFY(ierror)
         endif
         call MPI_Bcast(s%back_ranks, nwriter, MPI_INTEGER, 0, s%front_comm, ierror)
         _VERIFY(ierror)

         call MPI_AllGather(s_rank, 1, MPI_INTEGER, s%front_ranks, 1, MPI_INTEGER, s%front_comm, ierror)
         _VERIFY(ierror)
         if (local_rank ==0 ) then
            call MPI_Send(s%front_ranks, s_size-nwriter, MPI_INTEGER, s%back_ranks(1), 777, s%server_comm, ierror)
            _VERIFY(ierror)
         endif

      endif

      if (index(s_name, 'o_server_back') /=0) then
         s%back_comm = s_comm%get_subcommunicator()
         call MPI_AllGather(s_rank, 1, MPI_INTEGER, s%back_ranks, 1, MPI_INTEGER, s%back_comm, ierror)
         _VERIFY(ierror)
         call MPI_Comm_rank(s%back_comm, local_rank, ierror)
         _VERIFY(ierror)
         if (local_rank ==0 ) then
            s%I_am_back_root = .true.
            call MPI_Send(s%back_ranks, nwriter, MPI_INTEGER, 0, 666, s%server_comm, ierror)
            _VERIFY(ierror)
         endif

         if (s_rank == s%back_ranks(1)) then
            _ASSERT( local_rank == 0, "re-arrange the rank of the server_comm")
            call MPI_recv(s%front_ranks, s%nfront, MPI_INTEGER, MPI_ANY_SOURCE, 777, s%server_comm, MPI_STAT,ierror)
            _VERIFY(ierror)
         endif

         call MPI_Bcast(s%front_ranks, s%nfront, MPI_INTEGER, 0, s%back_comm, ierror)
         _VERIFY(ierror)
         call s%set_status(1)
         call s%add_connection(dummy_socket)
      endif
      _RETURN(_SUCCESS)
   end function new_MultiCommServer

   subroutine start(this, rc)
      class (MultiCommServer), target, intent(inout) :: this
      integer, optional, intent(out) :: rc
      integer :: status

      if ( this%front_comm /= MPI_COMM_NULL) then
         call start_front(_RC)
      endif

      if ( this%back_comm /= MPI_COMM_NULL) then
         call start_back(_RC)
      endif

      call this%splitter%free_sub_comm()
      _RETURN(_SUCCESS)
   contains

      subroutine start_back(rc)
         integer, optional, intent(out) :: rc
         integer :: ierr
         integer :: my_rank, cmd, status

         call MPI_Comm_rank(this%server_comm, my_rank, ierr)
         _VERIFY(ierr)
         allocate(this%serverthread_done_msgs(1))
         this%serverthread_done_msgs(:) = .false.

         do while (.true.)

            call MPI_Bcast(cmd, 1, MPI_INTEGER, 0, this%server_comm, ierr)
            _VERIFY(ierr)
            if (cmd == -1) exit
            call this%create_remote_win(_RC)
            call this%receive_output_data(_RC)
            call this%put_dataToFile(_RC)

            call this%clean_up(_RC)

         enddo
         _RETURN(_SUCCESS)
      end subroutine start_back

      subroutine start_front(rc)
         integer, optional, intent(out) :: rc
         class (ServerThread), pointer :: thread_ptr => null()
         integer :: i,client_size,ierr
         logical, allocatable :: mask(:)
         integer :: terminate = -1
         integer :: status

         client_size = this%threads%size()

         allocate(this%serverthread_done_msgs(client_size))
         this%serverthread_done_msgs(:) = .false.

         allocate(mask(client_size))
         mask = .false.
         ! loop untill terminate
         do while (.true.)

            do i = 1,client_size

               if ( mask(i)) cycle

               thread_ptr=>this%threads%at(i)
               !handle the message
               call thread_ptr%run(_RC)
               !delete the thread object if it terminates
               if(thread_ptr%do_terminate()) then
                  mask(i) = .true.
               endif
            enddo

            if (all(mask)) exit

         enddo

         call this%threads%clear()
         call MPI_Bcast(terminate, 1, MPI_INTEGER, 0, this%server_comm, ierr)
         _VERIFY(ierr)
         deallocate(mask)
         _RETURN(_SUCCESS)
      end subroutine start_front

   end subroutine start

   function get_writing_PE(this,id) result (rank)
      class(MultiCommServer),intent(in) :: this
      integer, intent(in) :: id
      integer :: rank, ith

      ith  = mod(id-1,this%nwriter)
      rank = this%back_ranks(ith+1)

   end function get_writing_PE

   subroutine create_remote_win(this, rc)
      class (MultiCommServer), target, intent(inout) :: this
      integer, optional, intent(out) :: rc
      class (AbstractDataReference), pointer :: remotePtr
      integer :: rank
      integer(KIND=INT64) :: msize_word
      integer(KIND=INT64),allocatable :: offsets(:), msize_words(:)
      type (MessageVectorIterator) :: iter
      type (StringInteger64MapIterator) :: request_iter
      class (AbstractMessage), pointer :: msg
      integer :: collection_counter, collection_total
      integer :: MPI_STAT(MPI_STATUS_SIZE)
      character(len=*),parameter :: Iam = 'create_remote_win'
      class (ServerThread),pointer :: thread_ptr
      integer :: bsize, ierr, status
      integer :: cmd = 1
      integer, allocatable :: buffer(:)

      if (this%front_comm /= MPI_COMM_NULL) then
         call MPI_Bcast(cmd, 1, MPI_INTEGER, 0, this%server_comm, ierr)
         _VERIFY(ierr)
      endif

      this%stage_offset = StringInteger64map()

      thread_ptr=> this%threads%at(1)

      if (this%I_am_front_root) then
         call serialize_message_vector(thread_ptr%request_backlog,buffer)
         bsize = size(buffer)
         call MPI_send(bsize,  1,     MPI_INTEGER, this%back_ranks(1), this%back_ranks(1), this%server_Comm, ierr)
         _VERIFY(ierr)
         call MPI_send(buffer, bsize, MPI_INTEGER, this%back_ranks(1), this%back_ranks(1), this%server_Comm, ierr)

         call HistoryCollectionVector_serialize(thread_ptr%hist_collections, buffer)
         bsize = size(buffer)
         call MPI_send(bsize,  1,     MPI_INTEGER, this%back_ranks(1), this%back_ranks(1), this%server_Comm, ierr)
         _VERIFY(ierr)
         call MPI_send(buffer, bsize, MPI_INTEGER, this%back_ranks(1), this%back_ranks(1), this%server_Comm, ierr)
         _VERIFY(ierr)

      endif

      if (this%back_comm /= MPI_COMM_NULL) then
         if (this%I_am_back_root) then
           call MPI_recv( bsize, 1, MPI_INTEGER,    &
             this%front_ranks(1), this%back_ranks(1), this%server_comm, &
             MPI_STAT, ierr)
           _VERIFY(ierr)
           allocate(buffer(bsize))
           call MPI_recv( buffer,bsize, MPI_INTEGER,    &
             this%front_ranks(1), this%back_ranks(1), this%server_comm, &
             MPI_STAT, ierr)
           _VERIFY(ierr)
         endif
         call MPI_Bcast(bsize, 1, MPI_INTEGER, 0, this%back_comm, ierr)
         _VERIFY(ierr)
         if (.not. allocated(buffer)) allocate(buffer(bsize))
         call MPI_Bcast(buffer, bsize, MPI_INTEGER, 0, this%back_comm, ierr)
         _VERIFY(ierr)
         call deserialize_message_vector(buffer, thread_ptr%request_backlog)
         deallocate (buffer)

         if (this%I_am_back_root) then
           call MPI_recv( bsize, 1, MPI_INTEGER,    &
             this%front_ranks(1), this%back_ranks(1), this%server_comm, &
             MPI_STAT, ierr)
           _VERIFY(ierr)
           allocate(buffer(bsize))
           call MPI_recv( buffer,bsize, MPI_INTEGER,    &
             this%front_ranks(1), this%back_ranks(1), this%server_comm, &
             MPI_STAT, ierr)
           _VERIFY(ierr)
         endif
         call MPI_Bcast(bsize, 1, MPI_INTEGER, 0, this%back_comm, ierr)
         _VERIFY(ierr)
         if (.not. allocated(buffer)) allocate(buffer(bsize))
         call MPI_Bcast(buffer, bsize, MPI_INTEGER, 0, this%back_comm, ierr)
         _VERIFY(ierr)
         call HistoryCollectionVector_deserialize(buffer, thread_ptr%hist_collections)
         deallocate (buffer)
      endif

      collection_counter = 0
      collection_total   = 0
      !(1) loop to get the total number of collection
      iter   = thread_ptr%request_backlog%begin()
      do while (iter /= thread_ptr%request_backlog%end())
         msg => iter%get()

         select type (q=>msg)
         type is (CollectiveStageDataMessage)
            request_iter = this%stage_offset%find(i_to_string(q%collection_id))
            if ( request_iter == this%stage_offset%end()) then
               collection_total = collection_total + 1
               call this%stage_offset%insert(i_to_string(q%collection_id),int(collection_total, kind=INT64))
            endif
         end select
         call iter%next()
      end do

     !(2) loop to get the total size and offset of each collection and request
      allocate(offsets(collection_total), msize_words(collection_total))
      offsets = 0
      iter   = thread_ptr%request_backlog%begin()
      do while (iter /= thread_ptr%request_backlog%end())
         msg => iter%get()

         select type (q=>msg)
         type is (CollectiveStageDataMessage)
            collection_counter = this%stage_offset%of(i_to_string(q%collection_id))
            request_iter = this%stage_offset%find(i_to_string(q%request_id))
            if ( request_iter == this%stage_offset%end()) then
               ! insert local offset for each node
               call this%stage_offset%insert(i_to_string(q%request_id),offsets(collection_counter))
               msize_word = word_size(q%type_kind)*product(int(q%global_count,INT64))
               offsets(collection_counter) = offsets(collection_counter) + msize_word
            endif
         end select
         call iter%next()
      end do
      msize_words = offsets

      do collection_counter = 1, collection_total

         rank = this%get_writing_PE(collection_counter)
         msize_word = msize_words(collection_counter)
         call this%stage_offset%insert(i_to_string(MSIZE_ID + collection_counter ),msize_word)

         allocate(remotePtr, source  = RDMAReference(pFIO_INT32,msize_word, this%server_comm, rank ))
         call this%add_DataReference(remotePtr)
         remotePtr=>null()
      enddo

      _RETURN(_SUCCESS)
   end subroutine create_remote_win

   subroutine put_DataToFile(this, rc)
     class (MultiCommServer),target, intent(inout) :: this
     integer, optional, intent(out) :: rc

     integer ::  n
     class (ServerThread),pointer :: threadPtr
     integer,pointer :: i_ptr(:)
     integer :: collection_counter
     class (AbstractDataReference), pointer :: dataRefPtr
     type (LocalMemReference), pointer :: memdataPtr=>null()
     integer(kind=MPI_ADDRESS_KIND) :: msize
     integer :: num_clients, l_rank, w_rank, ierr, empty(0), status
     !real(KIND=REAL64) :: t0, t1

     !t0 = 0.0d0
     !t1 = -1.0d0
     num_clients = this%threads%size()
     if (num_clients == 0) then
        _RETURN(_SUCCESS)
     endif


     if (this%back_comm /= MPI_COMM_NULL) then

        call MPI_comm_rank(this%back_comm, l_rank, ierr)
        _VERIFY(ierr)
        ! copy and save the data
        do collection_counter = 1, this%dataRefPtrs%size()
              dataRefPtr => this%get_dataReference(collection_counter)
              msize  = this%stage_offset%of(i_to_string(MSIZE_ID+collection_counter))
              call c_f_pointer(dataRefPtr%base_address,i_ptr,shape=[msize])
              w_rank = this%get_writing_PE(collection_counter)
              if ( w_rank == this%back_ranks(l_rank+1)) then !
                 allocate(memdataPtr, source = LocalMemReference(i_ptr))
                 call this%MemdataRefPtrs%push_back(memdataPtr)
              else
                 ! push null ptr, make sure the total is collection_total
                 allocate(memdataPtr, source = LocalMemReference(empty))
                 call this%MemdataRefPtrs%push_back(memdataPtr)
              endif
              memdataPtr => null()
         enddo

     endif

     if (this%front_comm /= MPI_COMM_NULL) then
        do n = 1, num_clients
           threadPtr=>this%threads%at(n)
           call threadPtr%clear_backlog()
           call threadPtr%clear_hist_collections()
           call threadPtr%clear_subarray()
        enddo ! threads
     endif
     !if( t1-t0 > 0) then
     !   print*, "this rank",this%rank,"spending ", t1-t0, " seconds writing"
     !endif
     _RETURN(_SUCCESS)
   end subroutine put_DataToFile

  subroutine clean_up(this, rc)
      class(MultiCommServer), target, intent(inout) :: this
      integer, optional, intent(out) :: rc
      class (ServerThread),pointer :: threadPtr
      class (AbstractMessage),pointer :: msg
      type (MessageVectorIterator) :: msg_iter
      integer,pointer :: i_ptr(:)
      type(c_ptr) :: offset_address
      integer :: collection_counter
      class (AbstractDataReference), pointer :: dataRefPtr
      integer(kind=MPI_ADDRESS_KIND) :: offset
      integer :: w_rank, l_rank, ierr, status
      type(StringInteger64MapIterator) :: iter


      call this%clear_DataReference()
      call this%clear_RequestHandle(_RC)
      call this%set_AllBacklogIsEmpty(.true.)
      this%serverthread_done_msgs(:) = .false.

      iter = this%prefetch_offset%begin()
      do while (iter /= this%prefetch_offset%end())
         call this%prefetch_offset%erase(iter)
         iter = this%prefetch_offset%begin()
      enddo

      if (this%front_Comm /= MPI_COMM_NULL) then
         iter = this%stage_offset%begin()
         do while (iter /= this%stage_offset%end())
            call this%stage_offset%erase(iter)
            iter = this%stage_offset%begin()
         enddo
      endif

      if (this%back_Comm /= MPI_COMM_NULL) then
        ! time to write file
         call MPI_comm_rank(this%back_comm, l_rank, ierr)
         _VERIFY(ierr)

         threadPtr=>this%threads%at(1)
         msg_iter = threadPtr%request_backlog%begin()
         do while (msg_iter /= threadPtr%request_backlog%end())
            msg => msg_iter%get()
            select type (q=>msg)
            type is (CollectiveStageDataMessage)
               collection_counter = this%stage_offset%of(i_to_string(q%collection_id))
               dataRefPtr => this%MemdataRefPtrs%at(collection_counter)

               select type(dataRefPtr)
               type is (LocalMemReference)
                 i_ptr =>dataRefPtr%i_ptr
               class default
                  _FAIL( "I expect localmemReference")
               end select

               iter = this%stage_offset%find(i_to_string(q%request_id)//'done')
               w_rank = this%get_writing_PE(collection_counter)
               if (iter == this%stage_offset%end() .and.  w_rank == this%back_ranks(l_rank+1)) then
                  ! (1) get address where data should put
                  offset     = this%stage_offset%at(i_to_string(q%request_id))
                  offset_address   = c_loc(i_ptr(offset+1))
                  ! (2) write data
                  call threadPtr%put_DataToFile(q,offset_address, _RC)
                  !  (3) leave a mark, it has been written
                  call this%stage_offset%insert(i_to_string(q%request_id)//'done',0_MPI_ADDRESS_KIND)
               endif
            class default
               _FAIL( "I expect CollectiveStageDataMessage")
            end select
            call msg_iter%next()
         enddo

         call threadPtr%clear_backlog()
         call threadPtr%clear_hist_collections()

         iter = this%stage_offset%begin()
         do while (iter /= this%stage_offset%end())
            call this%stage_offset%erase(iter)
            iter = this%stage_offset%begin()
         enddo

         do collection_counter =  1, this%MemdataRefPtrs%size()
            dataRefPtr => this%MemdataRefPtrs%at(collection_counter)
            call dataRefPtr%deallocate()
         enddo
         call this%MemdataRefPtrs%erase(this%MemdataRefPtrs%begin(), this%MemdataRefPtrs%end())
      end if

      _RETURN(_SUCCESS)
   end subroutine clean_up

   integer function get_communicator(this) result(communicator)
      class (MultiCommServer), intent(in) :: this

      communicator = this%front_comm

   end function get_communicator

  subroutine receive_output_data(this, rc)
     class (MultiCommServer),target, intent(inout) :: this
     integer, optional, intent(out) :: rc

     integer :: i, client_num, status
     class (ServerThread),pointer :: threadPtr
     class (AbstractDataReference), pointer :: dataRefPtr

     client_num = this%threads%size()

     if (this%front_comm /= MPI_COMM_NULL) then
       do i = 1, client_num
         threadPtr=>this%threads%at(i)
         call threadPtr%receive_output_data(rc=status)
         _VERIFY(status)
       enddo
     endif

     do i = 1, this%dataRefPtrs%size()
        dataRefPtr => this%get_dataReference(i)
        call dataRefPtr%fence(rc=status)
         _VERIFY(status)
     enddo
     _RETURN(_SUCCESS)
   end subroutine receive_output_data


end module pFIO_MultiCommServerMod