MpiSocket.F90 Source File


This file depends on

sourcefile~~mpisocket.f90~~EfferentGraph sourcefile~mpisocket.f90 MpiSocket.F90 sourcefile~abstractdatareference.f90 AbstractDataReference.F90 sourcefile~mpisocket.f90->sourcefile~abstractdatareference.f90 sourcefile~abstractmessage.f90 AbstractMessage.F90 sourcefile~mpisocket.f90->sourcefile~abstractmessage.f90 sourcefile~abstractrequesthandle.f90 AbstractRequestHandle.F90 sourcefile~mpisocket.f90->sourcefile~abstractrequesthandle.f90 sourcefile~abstractsocket.f90 AbstractSocket.F90 sourcefile~mpisocket.f90->sourcefile~abstractsocket.f90 sourcefile~mapl_exceptionhandling.f90 MAPL_ExceptionHandling.F90 sourcefile~mpisocket.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~pfio_constants.f90 pFIO_Constants.F90 sourcefile~mpisocket.f90->sourcefile~pfio_constants.f90 sourcefile~pfio_utilities.f90 pFIO_Utilities.F90 sourcefile~mpisocket.f90->sourcefile~pfio_utilities.f90 sourcefile~protocolparser.f90 ProtocolParser.F90 sourcefile~mpisocket.f90->sourcefile~protocolparser.f90 sourcefile~abstractdatareference.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~abstractdatareference.f90->sourcefile~pfio_constants.f90 sourcefile~abstractdatareference.f90->sourcefile~pfio_utilities.f90 sourcefile~abstractmessage.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~abstractrequesthandle.f90->sourcefile~abstractdatareference.f90 sourcefile~mapl_errorhandling.f90 MAPL_ErrorHandling.F90 sourcefile~mapl_exceptionhandling.f90->sourcefile~mapl_errorhandling.f90 sourcefile~mapl_throw.f90 MAPL_Throw.F90 sourcefile~mapl_exceptionhandling.f90->sourcefile~mapl_throw.f90 sourcefile~pfio_utilities.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~pfio_utilities.f90->sourcefile~pfio_constants.f90 sourcefile~protocolparser.f90->sourcefile~abstractmessage.f90 sourcefile~addextcollectionmessage.f90 AddExtCollectionMessage.F90 sourcefile~protocolparser.f90->sourcefile~addextcollectionmessage.f90 sourcefile~addhistcollectionmessage.f90 AddHistCollectionMessage.F90 sourcefile~protocolparser.f90->sourcefile~addhistcollectionmessage.f90 sourcefile~collectiveprefetchdatamessage.f90 CollectivePrefetchDataMessage.F90 sourcefile~protocolparser.f90->sourcefile~collectiveprefetchdatamessage.f90 sourcefile~collectiveprefetchdonemessage.f90 CollectivePrefetchDoneMessage.F90 sourcefile~protocolparser.f90->sourcefile~collectiveprefetchdonemessage.f90 sourcefile~collectivestagedatamessage.f90 CollectiveStageDataMessage.F90 sourcefile~protocolparser.f90->sourcefile~collectivestagedatamessage.f90 sourcefile~collectivestagedonemessage.f90 CollectiveStageDoneMessage.F90 sourcefile~protocolparser.f90->sourcefile~collectivestagedonemessage.f90 sourcefile~donemessage.f90 DoneMessage.F90 sourcefile~protocolparser.f90->sourcefile~donemessage.f90 sourcefile~dummymessage.f90 DummyMessage.F90 sourcefile~protocolparser.f90->sourcefile~dummymessage.f90 sourcefile~filemetadata.f90 FileMetadata.F90 sourcefile~protocolparser.f90->sourcefile~filemetadata.f90 sourcefile~forwarddatamessage.f90 ForwardDataMessage.F90 sourcefile~protocolparser.f90->sourcefile~forwarddatamessage.f90 sourcefile~handshakemessage.f90 HandShakeMessage.F90 sourcefile~protocolparser.f90->sourcefile~handshakemessage.f90 sourcefile~idmessage.f90 IDMessage.F90 sourcefile~protocolparser.f90->sourcefile~idmessage.f90 sourcefile~integermessagemap.f90 IntegerMessageMap.F90 sourcefile~protocolparser.f90->sourcefile~integermessagemap.f90 sourcefile~modifymetadatamessage.f90 ModifyMetadataMessage.F90 sourcefile~protocolparser.f90->sourcefile~modifymetadatamessage.f90 sourcefile~prefetchdatamessage.f90 PrefetchDataMessage.F90 sourcefile~protocolparser.f90->sourcefile~prefetchdatamessage.f90 sourcefile~prefetchdonemessage.f90 PrefetchDoneMessage.F90 sourcefile~protocolparser.f90->sourcefile~prefetchdonemessage.f90 sourcefile~replacemetadatamessage.f90 ReplaceMetadataMessage.F90 sourcefile~protocolparser.f90->sourcefile~replacemetadatamessage.f90 sourcefile~stagedatamessage.f90 StageDataMessage.F90 sourcefile~protocolparser.f90->sourcefile~stagedatamessage.f90 sourcefile~stagedonemessage.f90 StageDoneMessage.F90 sourcefile~protocolparser.f90->sourcefile~stagedonemessage.f90 sourcefile~terminatemessage.f90 TerminateMessage.F90 sourcefile~protocolparser.f90->sourcefile~terminatemessage.f90

Files dependent on this one

sourcefile~~mpisocket.f90~~AfferentGraph sourcefile~mpisocket.f90 MpiSocket.F90 sourcefile~baseserver.f90 BaseServer.F90 sourcefile~baseserver.f90->sourcefile~mpisocket.f90 sourcefile~directoryservice.f90 DirectoryService.F90 sourcefile~directoryservice.f90->sourcefile~mpisocket.f90 sourcefile~multicommserver.f90 MultiCommServer.F90 sourcefile~multicommserver.f90->sourcefile~mpisocket.f90 sourcefile~multigroupserver.f90 MultiGroupServer.F90 sourcefile~multigroupserver.f90->sourcefile~mpisocket.f90 sourcefile~pfio.f90 pFIO.F90 sourcefile~pfio.f90->sourcefile~mpisocket.f90 sourcefile~test_directoryservice.pf Test_DirectoryService.pf sourcefile~test_directoryservice.pf->sourcefile~mpisocket.f90 sourcefile~test_mpisocket.pf Test_MpiSocket.pf sourcefile~test_mpisocket.pf->sourcefile~mpisocket.f90

Source Code

#include "MAPL_ErrLog.h"
#include "unused_dummy.H"

module pFIO_MpiSocketMod
   use iso_c_binding
   use, intrinsic :: iso_fortran_env, only: INT64
   use MAPL_ExceptionHandling
   use pFIO_AbstractSocketMod
   use pFIO_AbstractRequestHandleMod
   use pFIO_AbstractMessageMod
   use pFIO_ProtocolParserMod
   use pFIO_AbstractDataReferenceMod
   use pFIO_ConstantsMod
   use pFIO_UtilitiesMod, only: word_size, i_to_string
   use mpi
   implicit none
   private

   public :: MpiSocket

   type, extends(AbstractSocket) :: MpiSocket
      private
      type (ProtocolParser), pointer :: parser
      integer :: world_comm
      integer :: pair_comm
      integer :: world_remote_rank
      integer :: world_local_rank
      integer :: pair_local_rank
      integer :: pair_remote_rank
   contains
      procedure :: receive
      procedure :: send
      procedure :: put
      procedure :: get
      procedure :: to_string
   end type MpiSocket

   interface MpiSocket
      module procedure new_MpiSocket
   end interface MpiSocket

   integer, parameter :: PAIR_TAG = 10
   integer, parameter :: MESSAGE_TAG = 11
   integer, parameter :: TAG_TAG = 12

   integer, parameter :: MIN_NONBLOCKING_TAG = 100
   integer, parameter :: MAX_NONBLOCKING_TAG = 199

   ! private type
   type, extends(AbstractRequestHandle) :: MpiRequestHandle
     private
     integer :: mpi_request
   contains
      procedure :: wait
   end type MpiRequestHandle
   
   interface MpiRequestHandle
      module procedure new_MpiRequestHandle
   end interface MpiRequestHandle

contains

   function new_MpiSocket(comm, remote_rank, parser, rc) result(s)
      type (MpiSocket) :: s
      integer, intent(in) :: comm
      integer, intent(in) :: remote_rank
      type (ProtocolParser), target, intent(in) :: parser
      integer, optional, intent(out) :: rc

      integer :: ierror
      integer :: local_rank
      integer :: pair_group
      integer :: world_group
      integer :: ranks(2)

      s%parser => parser
      s%world_comm = comm
      s%world_remote_rank = remote_rank

      call MPI_Comm_rank(comm, local_rank, ierror)
      s%world_local_rank = local_rank

      call MPI_Comm_group(comm, world_group, ierror)

      ! Enforce consistent ordering in new communicator/group
      if (local_rank < remote_rank) then
         ranks = [local_rank, remote_rank]
         s%pair_local_rank = 0
         s%pair_remote_rank = 1
      else
         ranks = [remote_rank, local_rank]
         s%pair_local_rank = 1
         s%pair_remote_rank = 0
      end if
      call MPI_Group_incl(world_group, 2, ranks, pair_group, ierror)
      call MPI_Comm_create_group(comm, pair_group, PAIR_TAG, s%pair_comm, ierror)
      _RETURN(_SUCCESS)
   end function new_MpiSocket

   function receive(this, rc) result(message)
      class (AbstractMessage), pointer :: message
      class (MpiSocket), intent(inout) :: this
      integer, optional, intent(out) :: rc

      integer, allocatable :: buffer(:)
      integer :: ierror
      integer :: status(MPI_STATUS_SIZE)
      integer :: count

      call MPI_Probe(this%pair_remote_rank, MESSAGE_TAG, this%pair_comm, status, ierror)
      call MPI_Get_count(status, MPI_INTEGER, count, ierror)

      allocate(buffer(count))
      call MPI_Recv(buffer, count, MPI_INTEGER, this%pair_remote_rank, MESSAGE_TAG, this%pair_comm, &
           & status, ierror)

      allocate(message, source=this%parser%decode(buffer))
      _RETURN(_SUCCESS)
   end function receive

   subroutine send(this, message, rc)
      class (MpiSocket), target, intent(inout) :: this
      class (AbstractMessage), intent(in) :: message
      integer, optional, intent(out) :: rc

      integer, allocatable :: buffer(:)
      integer :: ierror

      buffer = this%parser%encode(message)
      call MPI_Send(buffer, size(buffer), MPI_INTEGER, this%pair_remote_rank, MESSAGE_TAG, this%pair_comm, &
           & ierror)
      _RETURN(_SUCCESS)      
   end subroutine send


   function new_MpiRequestHandle(data_reference, mpi_request) result(handle)
      type (MpiRequestHandle) :: handle
      class (AbstractDatareference), intent(in) :: data_reference
      integer, intent(in) :: mpi_request

      allocate(handle%data_reference,source = data_reference)
      handle%mpi_request = mpi_request
   end function new_MpiRequestHandle

   function put(this, request_id, local_reference, rc) result(handle)
      class (AbstractRequestHandle), allocatable :: handle
      class (MpiSocket), intent(inout) :: this
      integer, intent(in) :: request_id
      class (AbstractDatareference), intent(in) :: local_reference
      integer, optional, intent(out) :: rc

      integer :: request
      integer :: ierror
      integer :: tag

      integer, pointer :: data(:)
      integer :: n_words
      integer(kind=INT64) :: big_n
      
      tag = make_tag(request_id)

      big_n   = product(int(local_reference%shape, INT64)) * word_size(local_reference%type_kind)
      _ASSERT( big_n < huge(0), "Increase the number of processors to decrease the local size of data to be sent")
      n_words = big_n
      call c_f_pointer(local_reference%base_address, data, shape=[n_words])
      if (n_words ==0) allocate(data(1))
      call MPI_Isend(data, n_words, MPI_INTEGER, this%pair_remote_rank, tag, this%pair_comm, request, ierror)
      allocate(handle, source=MpiRequestHandle(local_reference, request))
      if (n_words ==0) deallocate(data)
      _RETURN(_SUCCESS)
   end function put

   function get(this, request_id, local_reference, rc) result(handle)
      class (AbstractRequestHandle), allocatable :: handle
      class (MpiSocket), target, intent(inout) :: this
      integer, intent(in) :: request_id
      class (AbstractDataReference), intent(in) :: local_reference
      integer, optional, intent(out) :: rc

      integer :: tag
      integer :: ierror
      integer :: request

      integer, pointer :: data(:)
      integer :: n_words

      tag = make_tag(request_id)

      n_words = product(local_reference%shape) * word_size(local_reference%type_kind)
      call c_f_pointer(local_reference%base_address, data, shape=[n_words])
      if (n_words ==0) allocate(data(1))
      call MPI_Irecv(data, n_words, MPI_INTEGER, this%pair_remote_rank, tag, this%pair_comm, request, ierror)
      allocate(handle, source=MpiRequestHandle(local_reference, request))
      if (n_words ==0) deallocate(data)
      _RETURN(_SUCCESS)
   end function get

   subroutine wait(this, rc)
      class (MpiRequestHandle), target, intent(inout) :: this
      integer, optional, intent(out) :: rc

      integer :: ierror
      integer :: status(MPI_STATUS_SIZE)

      call MPI_Wait(this%mpi_request, status, ierror)
      _VERIFY(ierror)
      _RETURN(_SUCCESS)
   end subroutine wait

   integer function get_next_tag() result(tag)
      integer, save :: global_tag = MIN_NONBLOCKING_TAG

      tag = global_tag
      global_tag = MIN_NONBLOCKING_TAG + mod(global_tag + 1 - MIN_NONBLOCKING_TAG, MAX_NONBLOCKING_TAG - MIN_NONBLOCKING_TAG + 1)

   end function get_next_tag

   integer function make_tag(request_id) result(tag)
      integer, intent(in) :: request_id

      tag = request_id

   end function make_tag
      
   function to_string(this) result(string)
      class (MpiSocket), intent(in) :: this
      character(len=:), allocatable :: string
      
      string = 'MpiSocket::info' // new_line('a')
      string = string // '... world local rank:  ' // i_to_string(this%world_local_rank) // new_line('a')
      string = string // '... world remote rank: ' // i_to_string(this%world_remote_rank) // new_line('a')

   end function to_string

end module pFIO_MpiSocketMod