AbstractServer.F90 Source File


This file depends on

sourcefile~~abstractserver.f90~~EfferentGraph sourcefile~abstractserver.f90 AbstractServer.F90 sourcefile~abstractdatareference.f90 AbstractDataReference.F90 sourcefile~abstractserver.f90->sourcefile~abstractdatareference.f90 sourcefile~abstractmessage.f90 AbstractMessage.F90 sourcefile~abstractserver.f90->sourcefile~abstractmessage.f90 sourcefile~collectivestagedatamessage.f90 CollectiveStageDataMessage.F90 sourcefile~abstractserver.f90->sourcefile~collectivestagedatamessage.f90 sourcefile~dummymessage.f90 DummyMessage.F90 sourcefile~abstractserver.f90->sourcefile~dummymessage.f90 sourcefile~mapl_exceptionhandling.f90 MAPL_ExceptionHandling.F90 sourcefile~abstractserver.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~mapl_profiler.f90~2 MAPL_Profiler.F90 sourcefile~abstractserver.f90->sourcefile~mapl_profiler.f90~2 sourcefile~messagevector.f90 MessageVector.F90 sourcefile~abstractserver.f90->sourcefile~messagevector.f90 sourcefile~pfio_constants.f90 pFIO_Constants.F90 sourcefile~abstractserver.f90->sourcefile~pfio_constants.f90 sourcefile~rdmareference.f90 RDMAReference.F90 sourcefile~abstractserver.f90->sourcefile~rdmareference.f90 sourcefile~shmemreference.f90 ShmemReference.F90 sourcefile~abstractserver.f90->sourcefile~shmemreference.f90

Files dependent on this one

sourcefile~~abstractserver.f90~~AfferentGraph sourcefile~abstractserver.f90 AbstractServer.F90 sourcefile~baseserver.f90 BaseServer.F90 sourcefile~baseserver.f90->sourcefile~abstractserver.f90 sourcefile~directoryservice.f90 DirectoryService.F90 sourcefile~directoryservice.f90->sourcefile~abstractserver.f90 sourcefile~mpiserver.f90 MpiServer.F90 sourcefile~mpiserver.f90->sourcefile~abstractserver.f90 sourcefile~multicommserver.f90 MultiCommServer.F90 sourcefile~multicommserver.f90->sourcefile~abstractserver.f90 sourcefile~multigroupserver.f90 MultiGroupServer.F90 sourcefile~multigroupserver.f90->sourcefile~abstractserver.f90 sourcefile~multilayerserver.f90 MultiLayerServer.F90 sourcefile~multilayerserver.f90->sourcefile~abstractserver.f90 sourcefile~openmpserver.f90 OpenMPServer.F90 sourcefile~openmpserver.f90->sourcefile~abstractserver.f90 sourcefile~pfio.f90 pFIO.F90 sourcefile~pfio.f90->sourcefile~abstractserver.f90 sourcefile~serverthread.f90 ServerThread.F90 sourcefile~serverthread.f90->sourcefile~abstractserver.f90

Source Code

#include "MAPL_ErrLog.h"
#include "unused_dummy.H"

module pFIO_AbstractServerMod
   use MAPL_Profiler
   use MAPL_ExceptionHandling
   use pFIO_ConstantsMod
   use pFIO_AbstractDataReferenceMod
   use pFIO_AbstractDataReferenceVectorMod
   use pFIO_ShmemReferenceMod
   use gFTL_StringInteger64Map
   use pFIO_AbstractMessageMod
   use pFIO_CollectiveStageDataMessageMod
   use pFIO_RDMAReferenceMod
   use pFIO_DummyMessageMod
   use pFIO_MessageVectorMod
   use mpi

   implicit none
   private
   public :: AbstractServer
   public :: ioserver_profiler
   type (DistributedProfiler), pointer :: ioserver_profiler=>null()

   integer,parameter,public :: MAX_SERVER_NODES_NUM = 100000
   integer,parameter,public :: MSIZE_ID = - MAX_SERVER_NODES_NUM
   integer,parameter,public :: UNALLOCATED = -100
   integer,parameter,public :: PENDING     = -200

   type,abstract :: AbstractServer
      integer :: comm   ! of all server processes
      integer :: rank   ! rank in all server processes
      integer :: npes   ! number of processes of the server
      integer :: status ! counter, UNALLOCATED, PENDING
      logical :: all_backlog_is_empty = .true.
      integer :: num_clients = 0
      logical :: terminate
      integer :: InNode_Comm  ! communicator in a node
      integer :: InNode_npes  ! number of processes in a node, it may differ across server nodes
      integer :: InNode_Rank  ! rank in a node
      integer :: NodeRoot_Comm! communicator of server nodes' roots
      integer :: Node_Num     ! number of server nodes
      integer :: Node_Rank    ! rank of server nodes

      ! save info about which process belongs to which node
      ! all processes keep this info
      integer,allocatable    :: Node_Ranks(:)
      type(StringInteger64Map) :: prefetch_offset
      type(StringInteger64Map) :: stage_offset
      logical , allocatable :: serverthread_done_msgs(:)
      type(AbstractDataReferenceVector) :: dataRefPtrs
   contains
      procedure :: init
      procedure(start),deferred :: start
      procedure(get_dmessage), deferred :: get_dmessage
      procedure(clear_RequestHandle), deferred :: clear_RequestHandle
      procedure(set_collective_request), deferred :: set_collective_request
      procedure(create_remote_win), deferred :: create_remote_win
      procedure :: get_status
      procedure :: set_status
      procedure :: update_status
      procedure :: clean_up
      procedure :: set_AllBacklogIsEmpty
      procedure :: get_AllBacklogIsEmpty
      procedure :: get_DataReference
      procedure :: add_DataReference
      procedure :: clear_DataReference
      procedure :: I_am_NodeRoot
      procedure :: I_am_ServerRoot
      procedure :: receive_output_data
      procedure :: put_DataToFile
      procedure :: get_DataFromMem
      procedure :: am_I_reading_PE
      procedure :: get_writing_PE
      procedure :: distribute_task
      procedure :: get_communicator
      procedure :: report_profile
   end type AbstractServer

   abstract interface

      subroutine start(this, rc)
         use pFIO_AbstractSocketVectorMod
         import AbstractServer
         class(AbstractServer),target,intent(inout) :: this
         integer, optional, intent(out) :: rc
      end subroutine start

      subroutine clear_RequestHandle(this, rc)
         import AbstractServer
         class(AbstractServer),target,intent(inout) :: this
         integer, optional, intent(out) :: rc
      end subroutine clear_RequestHandle

      subroutine set_collective_request(this, request, have_done)
         import AbstractServer
         class(AbstractServer),target,intent(inout) :: this
         logical, intent(in) :: request, have_done
      end subroutine set_collective_request

      function get_dmessage(this, rc) result(dmessage)
         import AbstractServer
         import AbstractMessage
         class (AbstractServer), target, intent(in) :: this
         integer, optional, intent(out) :: rc
         class(AbstractMessage), pointer :: dmessage
      end function

      subroutine create_remote_win(this,rc)
         import AbstractServer
         class(AbstractServer),target,intent(inout) :: this
         integer, optional, intent(out) :: rc
      end subroutine create_remote_win

   end interface

contains

   subroutine init(this,comm, port_name, profiler_name, with_profiler, rc)
      class (AbstractServer),intent(inout) :: this
      integer, intent(in) :: comm
      character(*), intent(in) :: port_name
      character(*), optional, intent(in) :: profiler_name
      logical, optional, intent(in) :: with_profiler
      integer, optional, intent(out) :: rc

      integer :: ierror, MyColor
      character(len=:), allocatable :: p_name

      this%comm = comm
      call MPI_Comm_rank(this%comm, this%rank, ierror)
      _VERIFY(ierror)
      call MPI_Comm_size(this%comm, this%npes, ierror)
      _VERIFY(ierror)

      this%num_clients = 0
      this%all_backlog_is_empty = .true.
      this%status = UNALLOCATED

      call MPI_Comm_split_type(this%comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, this%InNode_Comm,ierror)
      _VERIFY(ierror)
      call MPI_Comm_size(this%InNode_Comm, this%InNode_npes,ierror)
      _VERIFY(ierror)
      call MPI_Comm_rank(this%InNode_Comm, this%InNode_Rank,ierror)
      _VERIFY(ierror)

      MyColor = 0
      if (this%InNode_Rank == 0) MyColor = 1

      call MPI_COMM_SPLIT( comm, MyColor, this%rank, this%NodeRoot_Comm, ierror)
      _VERIFY(ierror)

      if (MyColor==0) then
         this%NodeRoot_Comm=MPI_COMM_NULL
      else
         call MPI_COMM_SIZE(this%NodeRoot_Comm, this%Node_Num, ierror)
         _VERIFY(ierror)
         call MPI_COMM_RANK(this%NodeRoot_Comm, this%Node_Rank, ierror)
         _VERIFY(ierror)
      endif

      call Mpi_Bcast(this%Node_Rank, 1, MPI_INTEGER, 0, this%InNode_Comm, ierror)
      _VERIFY(ierror)
      call Mpi_Bcast(this%Node_Num,  1, MPI_INTEGER, 0, this%InNode_Comm, ierror)
      _VERIFY(ierror)

      allocate(This%Node_Ranks(0:this%npes-1))

      call Mpi_AllGather(this%Node_Rank,  1, MPI_INTEGER, &
                         this%Node_Ranks, 1, MPI_INTEGER, comm,ierror)
      _VERIFY(ierror)

      if (present(profiler_name)) then
         p_name = profiler_name
      else
         p_name = port_name
      endif

      if (present(with_profiler)) then
         if (with_profiler) then
            if (.not. associated(ioserver_profiler)) then
               allocate(ioserver_profiler, source = DistributedProfiler(p_name, MpiTimerGauge(), comm))
               call ioserver_profiler%start()
            endif
         endif
      endif

      call MPI_Barrier(comm, ierror)
      _VERIFY(ierror)
      _RETURN(_SUCCESS)
   end subroutine init

   function get_status(this) result(status)
      class(AbstractServer),intent(in) :: this
      integer :: status

      !$omp critical (counter_status)
      status = this%status
      !$omp end critical (counter_status)
   end function get_status

   subroutine set_status(this,status)
      class(AbstractServer),intent(inout) :: this
      integer,intent(in) :: status

      !$omp critical (counter_status)
      this%status = status
      !$omp flush (this)
      !$omp end critical (counter_status)
   end subroutine  set_status

   subroutine update_status(this, rc)
      class(AbstractServer),intent(inout) :: this
      integer, optional, intent(out) :: rc
      integer :: status
      type(StringInteger64MapIterator) :: iter
      !$omp critical (counter_status)
      this%status = this%status -1
      status = this%status
      !$omp flush (this)
      !$omp end critical (counter_status)
      if (status /= 0) then
        _RETURN(_SUCCESS)
      endif
      ! status ==0, means the last server thread in the backlog

      call this%clear_DataReference()
      call this%clear_RequestHandle(_RC)
      call this%set_status(UNALLOCATED)
      call this%set_AllBacklogIsEmpty(.true.)

      iter = this%prefetch_offset%begin()
      do while (iter /= this%prefetch_offset%end())
         call this%prefetch_offset%erase(iter)
         iter = this%prefetch_offset%begin()
      enddo

      iter = this%stage_offset%begin()
      do while (iter /= this%stage_offset%end())
         call this%stage_offset%erase(iter)
         iter = this%stage_offset%begin()
      enddo
      this%serverthread_done_msgs(:) = .false.

      _RETURN(_SUCCESS)
   end subroutine update_status

   subroutine clean_up(this, rc)
      class(AbstractServer), target, intent(inout) :: this
      integer, optional, intent(out) :: rc
      type(StringInteger64MapIterator) :: iter
      integer :: status

      if (associated(ioserver_profiler)) call ioserver_profiler%start("clean_up")

      call this%clear_DataReference()
      Call this%clear_RequestHandle(_RC)
      call this%set_AllBacklogIsEmpty(.true.)
      this%serverthread_done_msgs(:) = .false.

      iter = this%prefetch_offset%begin()
      do while (iter /= this%prefetch_offset%end())
         call this%prefetch_offset%erase(iter)
         iter = this%prefetch_offset%begin()
      enddo

      iter = this%stage_offset%begin()
      do while (iter /= this%stage_offset%end())
         call this%stage_offset%erase(iter)
         iter = this%stage_offset%begin()
      enddo

      if (associated(ioserver_profiler)) call ioserver_profiler%stop("clean_up")

      _RETURN(_SUCCESS)
   end subroutine clean_up

   function get_AllBacklogIsEmpty(this) result(status)
      class(AbstractServer),intent(in) :: this
      logical :: status

      !$omp critical (backlog_status)
      status = this%all_backlog_is_empty
      !$omp end critical (backlog_status)
   end function get_AllBacklogIsEmpty

   subroutine set_AllBacklogIsEmpty(this,status)
      class(AbstractServer),intent(inout) :: this
      logical :: status

      !$omp critical (backlog_status)
      this%all_backlog_is_empty = status
      !$omp flush (this)
      !$omp end critical (backlog_status)
   end subroutine set_AllBacklogIsEmpty

   function I_am_NodeRoot(this) result (yes)
      class (AbstractServer), intent(in) :: this
      logical :: yes
      yes = (this%NodeRoot_Comm /= MPI_COMM_NULL)
   end function

   function I_am_ServerRoot(this) result (yes)
      class (AbstractServer), intent(in) :: this
      logical :: yes
      yes = (this%rank == 0)
   end function I_am_ServerRoot

   subroutine receive_output_data(this, rc)
     class (AbstractServer),target, intent(inout) :: this
     integer, optional, intent(out) :: rc

     _FAIL(" no action of receive_output_data")
     _UNUSED_DUMMY(this)
   end subroutine receive_output_data

   subroutine put_DataToFile(this, rc)
     class (AbstractServer),target, intent(inout) :: this
     integer, optional, intent(out) :: rc
     _FAIL(" no action of server_put_DataToFile")
     _UNUSED_DUMMY(this)
   end subroutine put_DataToFile

   subroutine get_DataFromMem(this,multi, rc)
     class (AbstractServer),target, intent(inout) :: this
     logical, intent(in) :: multi
     integer, optional, intent(out) :: rc
     _FAIL(" no action of server_get_DataFromMem")
     _UNUSED_DUMMY(this)
     _UNUSED_DUMMY(multi)
   end subroutine get_DataFromMem

   function am_I_reading_PE(this,id) result (yes)
      class(AbstractServer),intent(in) :: this
      integer, intent(in) :: id
      integer :: node_rank,innode_rank
      logical :: yes

      call this%distribute_task(id, node_rank,innode_rank)
      yes = (node_rank   == this%Node_Rank) .and. &
            (innode_rank == this%InNode_Rank)
   end function am_I_reading_PE

   ! distribute the task (id) to a specific process (node_rank, innode_rank)
   subroutine distribute_task(this,id, node_rank, innode_rank)
      class(AbstractServer),intent(in) :: this
      integer, intent(in)   :: id
      integer,intent(out) :: node_rank,innode_rank
      integer :: rank

      innode_rank = mod(id, this%InNode_npes)
      rank        = mod(id, this%npes)
      node_rank   = this%Node_Ranks(rank)

     _UNUSED_DUMMY(this)
   end subroutine distribute_task

   function get_writing_PE(this,id) result (rank)
      class(AbstractServer),intent(in) :: this
      integer, intent(in) :: id
      integer :: rank
      integer :: rank_tmp, ierror, rc

      integer :: node_rank,innode_rank
      logical :: yes

      node_rank   = mod(id-1,this%node_num)
      innode_rank = mod(id-1,this%innode_npes)

      yes = (node_rank   == this%Node_Rank) .and. &
            (innode_rank == this%InNode_Rank)
      rank_tmp = 0
      rank = 0
      if (yes) rank_tmp = this%rank
      call Mpi_Allreduce(rank_tmp,rank,1, MPI_INTEGER, MPI_SUM, this%comm, ierror)
      _VERIFY(ierror)

   end function get_writing_PE

   function get_DataReference(this, ith) result(DataRef)
      class (AbstractServer),target, intent(in) :: this
      integer,optional, intent(in) :: ith
      class(AbstractDataReference),pointer :: DataRef
      integer :: i
      i=1
      if(present(ith)) i = ith
      DataRef => this%dataRefPtrs%at(i)

   end function get_DataReference

   subroutine add_DataReference(this,DataRef)
      class (AbstractServer), intent(inout) :: this
      class(AbstractDataReference),target,intent(in) :: DataRef

      call this%dataRefPtrs%push_back(DataRef)
      call this%set_status(this%num_clients)

   end subroutine add_DataReference

   subroutine clear_DataReference(this)
      class (AbstractServer), target, intent(inout) :: this
      class (AbstractDataReference), pointer :: datarefPtr
      integer :: n, i

      n = this%dataRefPtrs%size()
      do i = 1, n
         dataRefPtr => this%dataRefPtrs%at(i)
         call dataRefPtr%deallocate()
      enddo
      call this%dataRefPtrs%erase(this%dataRefPtrs%begin(), this%dataRefPtrs%end())

   end subroutine clear_DataReference

   integer function get_communicator(this) result(communicator)
      class (AbstractServer), intent(in) :: this

      communicator = this%comm

   end function get_communicator

   subroutine report_profile(this, rc )
      class (AbstractServer), intent(inout) :: this
      integer, optional,   intent(  out) :: RC     ! Error code:
      character(:), allocatable :: report_lines(:)
      type (ProfileReporter) :: reporter
      character(1) :: empty(0)
      integer :: i, status

      if ( .not. associated(ioserver_profiler)) then
         _RETURN(_SUCCESS)
      endif

      call ioserver_profiler%stop(_RC)
      call ioserver_profiler%reduce()

      reporter = ProfileReporter(empty)
      call reporter%add_column(NameColumn(20))
      call reporter%add_column(FormattedTextColumn('Inclusive','(f10.2)', 10, InclusiveColumn('MEAN')))
      call reporter%add_column(FormattedTextColumn('% Incl','(f6.2)', 6, PercentageColumn(InclusiveColumn('MEAN'),'MAX')))
      call reporter%add_column(FormattedTextColumn('Exclusive','(f10.2)', 10, ExclusiveColumn('MEAN')))
      call reporter%add_column(FormattedTextColumn('% Excl','(f6.2)', 6, PercentageColumn(ExclusiveColumn('MEAN'))))
      call reporter%add_column(FormattedTextColumn(' Max Excl)','(f10.2)', 10, ExclusiveColumn('MAX')))
      call reporter%add_column(FormattedTextColumn(' Min Excl)','(f10.2)', 10, ExclusiveColumn('MIN')))
      call reporter%add_column(FormattedTextColumn('Max PE)','(1x,i4.4,1x)', 6, ExclusiveColumn('MAX_PE')))
      call reporter%add_column(FormattedTextColumn('Min PE)','(1x,i4.4,1x)', 6, ExclusiveColumn('MIN_PE')))
      report_lines = reporter%generate_report(ioserver_profiler)

      if (this%rank == 0) then
         write(*,'(a)')'Final io_server profile'
         write(*,'(a)')'============='
         do i = 1, size(report_lines)
            write(*,'(a)') report_lines(i)
         end do
         write(*,'(a)') ''
      end if

      deallocate(ioserver_profiler)
      _RETURN(_SUCCESS)
   end subroutine report_profile

end module pFIO_AbstractServerMod