BaseServer.F90 Source File


This file depends on

sourcefile~~baseserver.f90~~EfferentGraph sourcefile~baseserver.f90 BaseServer.F90 sourcefile~abstractdatamessage.f90 AbstractDataMessage.F90 sourcefile~baseserver.f90->sourcefile~abstractdatamessage.f90 sourcefile~abstractdatareference.f90 AbstractDataReference.F90 sourcefile~baseserver.f90->sourcefile~abstractdatareference.f90 sourcefile~abstractmessage.f90 AbstractMessage.F90 sourcefile~baseserver.f90->sourcefile~abstractmessage.f90 sourcefile~abstractserver.f90 AbstractServer.F90 sourcefile~baseserver.f90->sourcefile~abstractserver.f90 sourcefile~abstractsocket.f90 AbstractSocket.F90 sourcefile~baseserver.f90->sourcefile~abstractsocket.f90 sourcefile~abstractsocketvector.f90 AbstractSocketVector.F90 sourcefile~baseserver.f90->sourcefile~abstractsocketvector.f90 sourcefile~collectivestagedatamessage.f90 CollectiveStageDataMessage.F90 sourcefile~baseserver.f90->sourcefile~collectivestagedatamessage.f90 sourcefile~donemessage.f90 DoneMessage.F90 sourcefile~baseserver.f90->sourcefile~donemessage.f90 sourcefile~dummymessage.f90 DummyMessage.F90 sourcefile~baseserver.f90->sourcefile~dummymessage.f90 sourcefile~mapl_exceptionhandling.f90 MAPL_ExceptionHandling.F90 sourcefile~baseserver.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~messagevector.f90 MessageVector.F90 sourcefile~baseserver.f90->sourcefile~messagevector.f90 sourcefile~mpisocket.f90 MpiSocket.F90 sourcefile~baseserver.f90->sourcefile~mpisocket.f90 sourcefile~pfio_constants.f90 pFIO_Constants.F90 sourcefile~baseserver.f90->sourcefile~pfio_constants.f90 sourcefile~pfio_utilities.f90 pFIO_Utilities.F90 sourcefile~baseserver.f90->sourcefile~pfio_utilities.f90 sourcefile~rdmareference.f90 RDMAReference.F90 sourcefile~baseserver.f90->sourcefile~rdmareference.f90 sourcefile~serverthread.f90 ServerThread.F90 sourcefile~baseserver.f90->sourcefile~serverthread.f90 sourcefile~serverthreadvector.f90 ServerThreadVector.F90 sourcefile~baseserver.f90->sourcefile~serverthreadvector.f90 sourcefile~shmemreference.f90 ShmemReference.F90 sourcefile~baseserver.f90->sourcefile~shmemreference.f90 sourcefile~simplesocket.f90 SimpleSocket.F90 sourcefile~baseserver.f90->sourcefile~simplesocket.f90

Files dependent on this one

sourcefile~~baseserver.f90~~AfferentGraph sourcefile~baseserver.f90 BaseServer.F90 sourcefile~abstractdirectoryservice.f90 AbstractDirectoryService.F90 sourcefile~abstractdirectoryservice.f90->sourcefile~baseserver.f90 sourcefile~directoryservice.f90 DirectoryService.F90 sourcefile~directoryservice.f90->sourcefile~baseserver.f90 sourcefile~mockserver.f90 MockServer.F90 sourcefile~mockserver.f90->sourcefile~baseserver.f90 sourcefile~mpiserver.f90 MpiServer.F90 sourcefile~mpiserver.f90->sourcefile~baseserver.f90 sourcefile~multicommserver.f90 MultiCommServer.F90 sourcefile~multicommserver.f90->sourcefile~baseserver.f90 sourcefile~multigroupserver.f90 MultiGroupServer.F90 sourcefile~multigroupserver.f90->sourcefile~baseserver.f90 sourcefile~multilayerserver.f90 MultiLayerServer.F90 sourcefile~multilayerserver.f90->sourcefile~baseserver.f90 sourcefile~openmpserver.f90 OpenMPServer.F90 sourcefile~openmpserver.f90->sourcefile~baseserver.f90 sourcefile~pfio.f90 pFIO.F90 sourcefile~pfio.f90->sourcefile~baseserver.f90

Source Code

#include "MAPL_ErrLog.h"
#include "unused_dummy.H"

module pFIO_BaseServerMod
   use, intrinsic :: iso_c_binding, only: c_ptr
   use, intrinsic :: iso_c_binding, only: c_loc
   use, intrinsic :: iso_fortran_env, only: INT64
   use, intrinsic :: iso_c_binding, only: c_f_pointer
   use MAPL_ExceptionHandling
   use pFIO_UtilitiesMod, only: word_size, i_to_string
   use pFIO_ConstantsMod
   use pFIO_ServerThreadMod
   use pFIO_ServerThreadVectorMod
   use pFIO_AbstractSocketMod
   use pFIO_AbstractSocketVectorMod
   use pFIO_AbstractServerMod
   use gFTL_StringInteger64Map
   use pFIO_AbstractMessageMod
   use pFIO_AbstractDataReferenceMod
   use pFIO_ShmemReferenceMod
   use pFIO_RDMAReferenceMod
   use pFIO_AbstractDataMessageMod
   use pFIO_MpiSocketMod
   use pFIO_SimpleSocketMod
   use pFIO_MessageVectorMod
   use pFIO_MessageVectorUtilMod
   use pFIO_DummyMessageMod
   use pFIO_DoneMessageMod
   use pFIO_CollectiveStageDataMessageMod
   use mpi
!   use pfio_base

   implicit none
   private

   public :: BaseServer

   type,extends (AbstractServer), abstract :: BaseServer
      type (ServerThreadVector) :: threads
   contains

      procedure :: receive_output_data
      procedure :: put_DataToFile
      procedure :: get_DataFromMem
      procedure :: add_connection
      procedure :: clear_RequestHandle
      procedure :: get_dmessage ! get done or dummy message
      procedure :: set_collective_request !
      procedure :: create_remote_win
   end type BaseServer

contains

   subroutine receive_output_data(this, rc)
     class (BaseServer),target, intent(inout) :: this
     integer, optional, intent(out) :: rc

     integer :: i, client_num, status
     class (ServerThread),pointer :: threadPtr
     class (AbstractDataReference), pointer :: dataRefPtr

     if (associated(ioserver_profiler)) call ioserver_profiler%start("receive_data")
     client_num = this%threads%size()

     do i = 1, client_num
         threadPtr=>this%threads%at(i)
         ! receive output data and save them on Shmem_reference
         call threadPtr%receive_output_data(rc=status)
         _VERIFY(status)
     enddo

     do i = 1, this%dataRefPtrs%size()
        dataRefPtr => this%get_dataReference(i)
        call dataRefPtr%fence(rc=status)
         _VERIFY(status)
     enddo

     if (associated(ioserver_profiler)) call ioserver_profiler%stop("receive_data")
     _RETURN(_SUCCESS)
   end subroutine receive_output_data

   subroutine put_DataToFile(this, rc)
     class (BaseServer),target, intent(inout) :: this
     integer, optional, intent(out) :: rc

     integer ::  n
     class (ServerThread),pointer :: threadPtr
     class (AbstractMessage),pointer :: msg
     type (MessageVectorIterator) :: iter
     type (StringInteger64MapIterator) :: request_iter
     integer,pointer :: i_ptr(:)
     type(c_ptr) :: offset_address
     integer :: collection_counter
     class (AbstractDataReference), pointer :: dataRefPtr
     class (RDMAReference), pointer :: remotePtr
     integer(kind=MPI_ADDRESS_KIND) :: offset, msize
     integer :: num_clients, status
     !real(KIND=REAL64) :: t0, t1

     !t0 = 0.0d0
     !t1 = -1.0d0
     num_clients = this%threads%size()
     if (num_clients == 0) then
        _RETURN(_SUCCESS)
     endif

     if (associated(ioserver_profiler)) call ioserver_profiler%start("write_data")

     threadPtr=>this%threads%at(1)

     iter = threadPtr%request_backlog%begin()
     ! t0 = mpi_wtime()
     do while (iter /= threadPtr%request_backlog%end())
        msg => iter%get()
        select type (q=>msg)
        type is (CollectiveStageDataMessage)
           if (associated(ioserver_profiler)) call ioserver_profiler%start("collection_"//i_to_string(q%collection_id))
           collection_counter = this%stage_offset%of(i_to_string(q%collection_id))
           dataRefPtr => this%get_dataReference(collection_counter)
           msize  = this%stage_offset%of(i_to_string(MSIZE_ID+collection_counter))
           call c_f_pointer(dataRefPtr%base_address,i_ptr,shape=[msize])

           select type(dataRefPtr)
           type is (RDMAReference)
              remotePtr=>dataRefPtr
           class default
              _FAIL( "remote is a must")
           end select

           request_iter = this%stage_offset%find(i_to_string(q%request_id)//'done')
           if (request_iter == this%stage_offset%end() .and. this%rank == remotePtr%mem_rank ) then ! not read yet
             !print*, this%rank , " is wrinting this collection ", collection_counter
             ! (1) get address where data should put
              offset     = this%stage_offset%at(i_to_string(q%request_id))
              offset_address   = c_loc(i_ptr(offset+1))
             ! (2) write data
              call threadPtr%put_DataToFile(q,offset_address, _RC)
             ! (3) leave a mark, it has been written
              call this%stage_offset%insert(i_to_string(q%request_id)//'done',0_MPI_ADDRESS_KIND)
              !t1 = mpi_wtime()
           endif ! rank = mem_rank
           if (associated(ioserver_profiler)) call ioserver_profiler%stop("collection_"//i_to_string(q%collection_id))
        end select
        call iter%next()
     enddo ! do backlog loop

     do n = 1, num_clients
        threadPtr=>this%threads%at(n)
        call threadPtr%clear_backlog()
        call threadPtr%clear_hist_collections()
        call threadPtr%clear_subarray()
     enddo ! threads
     !if( t1-t0 > 0) then
     !   print*, "this rank",this%rank,"spending ", t1-t0, " seconds writing"
     !endif
     if (associated(ioserver_profiler)) call ioserver_profiler%stop("write_data")
     _RETURN(_SUCCESS)
   end subroutine put_DataToFile

   subroutine get_DataFromMem(this, multi, rc)
     class (BaseServer),target, intent(inout) :: this
     logical, intent(in) :: multi
     integer, optional, intent(out) :: rc

     integer :: i, client_num, status
     class (ServerThread),pointer :: threadPtr

     client_num = this%threads%size()

     do i = 1, client_num
         threadPtr=>this%threads%at(i)
         ! get data from Shmem_reference and send it back to app
         call threadPtr%get_DataFromMem(multi, rc=status)
         _VERIFY(status)
     enddo

     _RETURN(_SUCCESS)
   end subroutine get_DataFromMem

   subroutine add_connection(this, socket)
      class (BaseServer), target, intent(inout) :: this
      class (AbstractSocket), target, intent(in) :: socket

      class(ServerThread), pointer :: thread_ptr
      integer :: k

      allocate(thread_ptr, source=ServerThread(socket, this))
      k = this%threads%size() + 1
      call thread_ptr%set_rank(k)
      call this%threads%push_Back(thread_ptr)
      nullify(thread_ptr)

      this%num_clients = this%num_clients + 1

   end subroutine add_connection

   ! done message of dummy message

   function get_dmessage(this, rc) result(dmessage)
      class (BaseServer), target, intent(in) :: this
      integer, optional, intent(out) :: rc

      class(AbstractMessage), pointer :: dmessage
      class(ServerThread), pointer :: thread_ptr
      integer :: n

      n = this%threads%size()
      if (n <= 0 ) then
         allocate(dmessage,source = DummyMessage())
         print*, "WARNING: no serverthread"
         _RETURN(_SUCCESS)
      endif

      thread_ptr=>this%threads%at(1)
      select type (socket => thread_ptr%get_connection())
      type is (SimpleSocket)
         allocate(dmessage,source = DoneMessage())
      type is (MpiSocket)
         allocate(dmessage,source = DummyMessage())
      class default
         _FAIL( "wrong socket type")
      end select

      _RETURN(_SUCCESS)
   end function

   subroutine clear_RequestHandle(this)
      class (BaseServer), target, intent(inout) :: this
      class(ServerThread), pointer :: thread_ptr
      integer :: i,n


      n = this%threads%size()

      do i = 1, n
         thread_ptr => this%threads%at(i)
         call thread_ptr%clear_RequestHandle()
      enddo


   end subroutine clear_RequestHandle

   subroutine set_collective_request(this, request, have_done)
      class (BaseServer), target, intent(inout) :: this
      logical, intent(in) :: request, have_done

      class(ServerThread), pointer :: thread_ptr
      integer :: i,n

      n = this%threads%size()

      do i = 1, n
         thread_ptr=>this%threads%at(i)
         call thread_ptr%set_collective_request(request, have_done)
      enddo

   end subroutine set_collective_request

   subroutine create_remote_win(this, rc)
      class (BaseServer), target, intent(inout) :: this
      integer, optional, intent(out) :: rc
      class (AbstractDataReference), pointer :: remotePtr
      integer :: rank
      integer(KIND=INT64) :: msize_word
      integer(KIND=INT64),allocatable :: offsets(:), msize_words(:)
      type (MessageVectorIterator) :: iter
      type (StringInteger64MapIterator) :: request_iter
      class (AbstractMessage), pointer :: msg
      integer :: collection_counter, collection_total
      character(len=*),parameter :: Iam = 'create_remote_win'
      class (ServerThread) , pointer :: thread_ptr

      if (associated(ioserver_profiler)) call ioserver_profiler%start("create_shared_mem")

      this%stage_offset = StringInteger64map()

      collection_counter = 0
      collection_total   = 0

      thread_ptr => this%threads%at(1)

      !(1) loop to get the total number of collection
      iter   = thread_ptr%request_backlog%begin()
      do while (iter /= thread_ptr%request_backlog%end())
         msg => iter%get()

         select type (q=>msg)
         type is (CollectiveStageDataMessage)
            request_iter = this%stage_offset%find(i_to_string(q%collection_id))
            if ( request_iter == this%stage_offset%end()) then
               collection_total = collection_total + 1
               call this%stage_offset%insert(i_to_string(q%collection_id),int(collection_total, kind=INT64))
            endif
         end select
         call iter%next()
      end do

      !(2) loop to get the total size and offset of each collection and request
      allocate(offsets(collection_total), msize_words(collection_total))
      offsets = 0
      iter   = thread_ptr%request_backlog%begin()
      do while (iter /= thread_ptr%request_backlog%end())
         msg => iter%get()

         select type (q=>msg)
         type is (CollectiveStageDataMessage)
            collection_counter = this%stage_offset%of(i_to_string(q%collection_id))
            request_iter = this%stage_offset%find(i_to_string(q%request_id))
            if ( request_iter == this%stage_offset%end()) then
               ! insert local offset for each node
               call this%stage_offset%insert(i_to_string(q%request_id),offsets(collection_counter))
               msize_word = word_size(q%type_kind)*product(int(q%global_count,INT64))
               offsets(collection_counter) = offsets(collection_counter) + msize_word
            endif
         end select
         call iter%next()
      end do
      ! (3) allocate the memory
      msize_words = offsets
      do collection_counter = 1, collection_total
         rank = this%get_writing_PE(collection_counter)
         msize_word = msize_words(collection_counter)
         call this%stage_offset%insert(i_to_string(MSIZE_ID + collection_counter ),msize_word)
         allocate(remotePtr, source  = RDMAReference(pFIO_INT32,msize_word, this%comm, rank ))
         call this%add_DataReference(remotePtr)
         remotePtr=>null()
      enddo

      if (associated(ioserver_profiler)) call ioserver_profiler%stop("create_shared_mem")
      _RETURN(_SUCCESS)

   end subroutine create_remote_win

end module pFIO_BaseServerMod