ClientThread.F90 Source File


This file depends on

sourcefile~~clientthread.f90~~EfferentGraph sourcefile~clientthread.f90 ClientThread.F90 sourcefile~abstractdatareference.f90 AbstractDataReference.F90 sourcefile~clientthread.f90->sourcefile~abstractdatareference.f90 sourcefile~abstractmessage.f90 AbstractMessage.F90 sourcefile~clientthread.f90->sourcefile~abstractmessage.f90 sourcefile~abstractrequesthandle.f90 AbstractRequestHandle.F90 sourcefile~clientthread.f90->sourcefile~abstractrequesthandle.f90 sourcefile~abstractsocket.f90 AbstractSocket.F90 sourcefile~clientthread.f90->sourcefile~abstractsocket.f90 sourcefile~addextcollectionmessage.f90 AddExtCollectionMessage.F90 sourcefile~clientthread.f90->sourcefile~addextcollectionmessage.f90 sourcefile~addhistcollectionmessage.f90 AddHistCollectionMessage.F90 sourcefile~clientthread.f90->sourcefile~addhistcollectionmessage.f90 sourcefile~basethread.f90 BaseThread.F90 sourcefile~clientthread.f90->sourcefile~basethread.f90 sourcefile~collectiveprefetchdatamessage.f90 CollectivePrefetchDataMessage.F90 sourcefile~clientthread.f90->sourcefile~collectiveprefetchdatamessage.f90 sourcefile~collectiveprefetchdonemessage.f90 CollectivePrefetchDoneMessage.F90 sourcefile~clientthread.f90->sourcefile~collectiveprefetchdonemessage.f90 sourcefile~collectivestagedatamessage.f90 CollectiveStageDataMessage.F90 sourcefile~clientthread.f90->sourcefile~collectivestagedatamessage.f90 sourcefile~collectivestagedonemessage.f90 CollectiveStageDoneMessage.F90 sourcefile~clientthread.f90->sourcefile~collectivestagedonemessage.f90 sourcefile~donemessage.f90 DoneMessage.F90 sourcefile~clientthread.f90->sourcefile~donemessage.f90 sourcefile~dummymessage.f90 DummyMessage.F90 sourcefile~clientthread.f90->sourcefile~dummymessage.f90 sourcefile~filemetadata.f90 FileMetadata.F90 sourcefile~clientthread.f90->sourcefile~filemetadata.f90 sourcefile~handshakemessage.f90 HandShakeMessage.F90 sourcefile~clientthread.f90->sourcefile~handshakemessage.f90 sourcefile~idmessage.f90 IDMessage.F90 sourcefile~clientthread.f90->sourcefile~idmessage.f90 sourcefile~integerrequestmap.f90 IntegerRequestMap.F90 sourcefile~clientthread.f90->sourcefile~integerrequestmap.f90 sourcefile~mapl_exceptionhandling.f90 MAPL_ExceptionHandling.F90 sourcefile~clientthread.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~mapl_keywordenforcer.f90 MAPL_KeywordEnforcer.F90 sourcefile~clientthread.f90->sourcefile~mapl_keywordenforcer.f90 sourcefile~messagevisitor.f90 MessageVisitor.F90 sourcefile~clientthread.f90->sourcefile~messagevisitor.f90 sourcefile~modifymetadatamessage.f90 ModifyMetadataMessage.F90 sourcefile~clientthread.f90->sourcefile~modifymetadatamessage.f90 sourcefile~prefetchdatamessage.f90 PrefetchDataMessage.F90 sourcefile~clientthread.f90->sourcefile~prefetchdatamessage.f90 sourcefile~prefetchdonemessage.f90 PrefetchDoneMessage.F90 sourcefile~clientthread.f90->sourcefile~prefetchdonemessage.f90 sourcefile~replacemetadatamessage.f90 ReplaceMetadataMessage.F90 sourcefile~clientthread.f90->sourcefile~replacemetadatamessage.f90 sourcefile~simplesocket.f90 SimpleSocket.F90 sourcefile~clientthread.f90->sourcefile~simplesocket.f90 sourcefile~stagedatamessage.f90 StageDataMessage.F90 sourcefile~clientthread.f90->sourcefile~stagedatamessage.f90 sourcefile~stagedonemessage.f90 StageDoneMessage.F90 sourcefile~clientthread.f90->sourcefile~stagedonemessage.f90 sourcefile~stringvariablemap.f90 StringVariableMap.F90 sourcefile~clientthread.f90->sourcefile~stringvariablemap.f90 sourcefile~terminatemessage.f90 TerminateMessage.F90 sourcefile~clientthread.f90->sourcefile~terminatemessage.f90

Files dependent on this one

sourcefile~~clientthread.f90~~AfferentGraph sourcefile~clientthread.f90 ClientThread.F90 sourcefile~clientmanager.f90 ClientManager.F90 sourcefile~clientmanager.f90->sourcefile~clientthread.f90 sourcefile~clientthreadvector.f90 ClientThreadVector.F90 sourcefile~clientthreadvector.f90->sourcefile~clientthread.f90 sourcefile~directoryservice.f90 DirectoryService.F90 sourcefile~directoryservice.f90->sourcefile~clientthread.f90 sourcefile~fastclientthread.f90 FastClientThread.F90 sourcefile~fastclientthread.f90->sourcefile~clientthread.f90 sourcefile~mockclient.f90 MockClient.F90 sourcefile~mockclient.f90->sourcefile~clientthread.f90 sourcefile~mockclientthread.f90 MockClientThread.F90 sourcefile~mockclientthread.f90->sourcefile~clientthread.f90 sourcefile~pfio.f90 pFIO.F90 sourcefile~pfio.f90->sourcefile~clientthread.f90 sourcefile~test_simplesocket.pf Test_SimpleSocket.pf sourcefile~test_simplesocket.pf->sourcefile~clientthread.f90

Source Code

#include "MAPL_ErrLog.h"
#include "unused_dummy.H"

module pFIO_ClientThreadMod
   use MAPL_ExceptionHandling
   use pFIO_AbstractMessageMod
   use pFIO_AbstractSocketMod
   use pFIO_AbstractRequestHandleMod
   use pFIO_IntegerRequestMapMod
   use pFIO_MessageVisitorMod
   use pFIO_BaseThreadMod
   use pFIO_AbstractDataReferenceMod
   use mapl_KeywordEnforcerMod
   use pFIO_SimpleSocketMod
   use pFIO_FileMetadataMod

   use pFIO_TerminateMessageMod
   use pFIO_DoneMessageMod
   use pFIO_DummyMessageMod
   use pFIO_HandShakeMessageMod
   use pFIO_PrefetchDoneMessageMod
   use pFIO_CollectivePrefetchDoneMessageMod
   use pFIO_StageDoneMessageMod
   use pFIO_CollectiveStageDoneMessageMod
   use pFIO_AddExtCollectionMessageMod
   use pFIO_AddHistCollectionMessageMod
   use pFIO_IdMessageMod
   use pFIO_PrefetchDataMessageMod
   use pFIO_StageDataMessageMod
   use pFIO_CollectivePrefetchDataMessageMod
   use pFIO_CollectiveStageDataMessageMod
   use pFIO_ModifyMetadataMessageMod
   use pFIO_ReplaceMetadataMessageMod
   use pFIO_StringVariableMapMod

   implicit none
   private

   public :: ClientThread

   integer, parameter :: MIN_ID = 1000
   integer, parameter :: MAX_ID = 9999

   integer, parameter :: COLLECTIVE_MIN_ID = 10000
   integer, parameter :: COLLECTIVE_MAX_ID = 19999


   type, extends(BaseThread) :: ClientThread
      private

      ! scratch pad for return values from application level interfaces
      integer :: collection_id      = -1
      integer :: request_counter    = MIN_ID
      integer :: collective_counter = COLLECTIVE_MIN_ID

   contains
      procedure :: add_ext_collection
      procedure :: add_hist_collection
      procedure :: modify_metadata
      procedure :: replace_metadata
      procedure :: prefetch_data
      procedure :: stage_data
      procedure :: collective_prefetch_data
      procedure :: collective_stage_data
      procedure :: stage_nondistributed_data
      procedure :: shake_hand

      procedure :: done
      procedure :: done_prefetch
      procedure :: done_collective_prefetch
      procedure :: done_stage
      procedure :: done_collective_stage
      procedure :: wait
      procedure :: wait_all
      procedure :: post_wait_all
      procedure :: terminate

      procedure :: handle_Id

      procedure :: get_unique_request_id
      procedure :: get_unique_collective_request_id
   end type ClientThread


   interface ClientThread
      module procedure new_ClientThread
   end interface ClientThread

contains

   function new_ClientThread(sckt) result(c)
      type (ClientThread),target :: c
      class(AbstractSocket),optional,intent(in) :: sckt

      if(present(sckt)) call c%set_connection(sckt)

   end function new_ClientThread

   subroutine handle_Id(this, message, rc)
      class (ClientThread), intent(inout) :: this
      type (IdMessage), intent(in) :: message
      integer, optional, intent(out) :: rc
      !this%collection_id = message%id
      _RETURN(_SUCCESS)
      _UNUSED_DUMMY(this)
      _UNUSED_DUMMY(message)
   end subroutine handle_Id

   function add_ext_collection(this, template, rc) result(collection_id)
      integer :: collection_id
      class (ClientThread), intent(inout) :: this
      character(len=*), intent(in) :: template
      integer, optional, intent(out) :: rc

      class (AbstractMessage), pointer :: message
      class(AbstractSocket),pointer :: connection
      integer :: status

      connection=>this%get_connection()
      call connection%send(AddExtCollectionMessage(template),_RC)
      message => connection%receive()
      select type(message)
      type is(IDMessage)
        collection_id = message%id
      class default
        _FAIL( " should get id message")
      end select
      _RETURN(_SUCCESS)
   end function add_ext_collection

   function add_hist_collection(this, fmd, unusable,  mode, rc) result(hist_collection_id)
      integer :: hist_collection_id
      class (ClientThread), target, intent(inout) :: this
      type(FileMetadata),intent(in) :: fmd
      class (KeywordEnforcer), optional, intent(out) :: unusable
      integer, optional, intent(in) :: mode
      integer, optional, intent(out) :: rc

      class (AbstractMessage), pointer :: message
      class(AbstractSocket), pointer :: connection

      connection=>this%get_connection()
      call connection%send(AddHistCollectionMessage(fmd, mode=mode))

      message => connection%receive()
      select type(message)
      type is(IDMessage)
        hist_collection_id = message%id
      class default
        _FAIL( " should get id message")
      end select

      _RETURN(_SUCCESS)
      _UNUSED_DUMMY(unusable)
   end function add_hist_collection

   function prefetch_data(this, collection_id, file_name, var_name, data_reference, &
        & unusable, start, rc) result(request_id)
      class (ClientThread), intent(inout) :: this
      integer, intent(in) :: collection_id
      character(len=*), intent(in) :: file_name
      character(len=*), intent(in) :: var_name
      class (AbstractDataReference), intent(in) :: data_reference
      class (KeywordEnforcer), optional, intent(out) :: unusable
      integer, optional, intent(in)  :: start(:)
      integer, optional, intent(out) :: rc

      integer :: request_id
      class (AbstractMessage), pointer :: handshake_msg
      class(AbstractSocket),pointer :: connection
      integer :: status

      request_id = this%get_unique_request_id()
      connection=>this%get_connection()
      call connection%send(PrefetchDataMessage( &
           request_id, &
           collection_id, &
           file_name, &
           var_name, &
           data_reference,unusable=unusable,start=start),_RC)

      handshake_msg => connection%receive()
      deallocate(handshake_msg)
      associate (id => request_id)
        ! the get call iRecv
        call this%insert_RequestHandle(id, connection%get(id, data_reference))
      end associate
      _RETURN(_SUCCESS)
   end function prefetch_data

   subroutine modify_metadata(this, collection_id, unusable,var_map, rc)
      class (ClientThread), intent(inout) :: this
      integer, intent(in) :: collection_id
      class (KeywordEnforcer), optional, intent(out) :: unusable
      type (StringVariableMap), optional,intent(in) :: var_map
      integer, optional, intent(out) :: rc

      class (AbstractMessage), pointer :: handshake_msg
      class(AbstractSocket),pointer :: connection
      integer :: status

      connection=>this%get_connection()
      call connection%send(ModifyMetadataMessage( &
           collection_id, &
           var_map=var_map),_RC)

      handshake_msg => connection%receive()
      deallocate(handshake_msg)
      _RETURN(_SUCCESS)
      _UNUSED_DUMMY(unusable)
   end subroutine modify_metadata

   subroutine replace_metadata(this, collection_id, fmd, rc)
      class (ClientThread), intent(inout) :: this
      integer, intent(in) :: collection_id
      type (FileMetadata),intent(in) :: fmd
      integer, optional, intent(out) :: rc

      class (AbstractMessage), pointer :: handshake_msg
      class(AbstractSocket),pointer :: connection
      integer :: status

      connection=>this%get_connection()
      call connection%send(ReplaceMetadataMessage(collection_id,fmd),_RC)

      handshake_msg => connection%receive()
      deallocate(handshake_msg)
      _RETURN(_SUCCESS)
   end subroutine replace_metadata

   function collective_prefetch_data(this, collection_id, file_name, var_name, data_reference, &
        & unusable, start,global_start,global_count, rc) result(request_id)
      class (ClientThread), intent(inout) :: this
      integer, intent(in) :: collection_id
      character(len=*), intent(in) :: file_name
      character(len=*), intent(in) :: var_name
      class (AbstractDataReference), intent(in) :: data_reference
      class (KeywordEnforcer), optional, intent(out) :: unusable
      integer, optional, intent(in) :: start(:)
      integer, optional, intent(in) :: global_start(:)
      integer, optional, intent(in) :: global_count(:)
      integer, optional, intent(out):: rc

      integer :: request_id

      class (AbstractMessage), pointer :: handshake_msg
      class(AbstractSocket),pointer :: connection
      integer :: status

      request_id = this%get_unique_collective_request_id()
      connection => this%get_connection()

      call connection%send(CollectivePrefetchDataMessage( &
           request_id, &
           collection_id, &
           file_name, &
           var_name, &
           data_reference,unusable=unusable, start=start,&
           global_start=global_start,global_count=global_count),_RC)

      handshake_msg => connection%receive()
      deallocate(handshake_msg)
      associate (id => request_id)
        ! the get call iRecv
        call this%insert_RequestHandle(id, connection%get(id, data_reference))
      end associate

      _RETURN(_SUCCESS)
   end function collective_prefetch_data

   function stage_data(this, collection_id, file_name, var_name, data_reference, &
        & unusable, start, rc) result(request_id)
      class (ClientThread), intent(inout) :: this
      integer, intent(in) :: collection_id
      character(len=*), intent(in) :: file_name
      character(len=*), intent(in) :: var_name
      class (AbstractDataReference), intent(in) :: data_reference
      class (KeywordEnforcer), optional, intent(out) :: unusable
      integer, optional, intent(in)  :: start(:)
      integer, optional, intent(out) :: rc

      integer :: request_id
      class (AbstractMessage), pointer :: handshake_msg
      class(AbstractSocket),pointer :: connection
      integer :: status

      request_id = this%get_unique_request_id()
      connection=>this%get_connection()
      call connection%send(StageDataMessage( &
           request_id, &
           collection_id, &
           file_name, &
           var_name, &
           data_reference,unusable=unusable,start=start),_RC)

      handshake_msg => connection%receive()
      deallocate(handshake_msg)
      associate (id => request_id)
        ! the put call iSend
        call this%insert_RequestHandle(id, connection%put(id, data_reference))
      end associate
      _RETURN(_SUCCESS)
   end function stage_data

   function collective_stage_data(this, collection_id, file_name, var_name, data_reference, &
        & unusable, start,global_start,global_count, rc) result(request_id)
      class (ClientThread), target, intent(inout) :: this
      integer, intent(in) :: collection_id
      character(len=*), intent(in) :: file_name
      character(len=*), intent(in) :: var_name
      class (AbstractDataReference), intent(in) :: data_reference
      class (KeywordEnforcer), optional, intent(out) :: unusable
      integer, optional, intent(in) :: start(:)
      integer, optional, intent(in) :: global_start(:)
      integer, optional, intent(in) :: global_count(:)
      integer, optional, intent(out) :: rc

      integer :: request_id

      class (AbstractMessage), pointer :: handshake_msg
      class(AbstractSocket),pointer :: connection
      integer :: status

      request_id = this%get_unique_collective_request_id()
      connection => this%get_connection()

      call connection%send(CollectiveStageDataMessage( &
           request_id, &
           collection_id, &
           file_name, &
           var_name, &
           data_reference,unusable=unusable, start=start,&
           global_start=global_start,global_count=global_count),_RC)

      handshake_msg => connection%receive()
      deallocate(handshake_msg)
      associate (id => request_id)
        ! the put call iSend
        call this%insert_RequestHandle(id, connection%put(id, data_reference))
      end associate

      _RETURN(_SUCCESS)
   end function collective_stage_data

   function stage_nondistributed_data(this, collection_id, file_name, var_name, data_reference, rc) result(request_id)
      class (ClientThread), intent(inout) :: this
      integer, intent(in) :: collection_id
      character(len=*), intent(in) :: file_name
      character(len=*), intent(in) :: var_name
      class (AbstractDataReference), intent(in) :: data_reference
      integer, optional, intent(out) :: rc


      integer :: request_id

      class (AbstractMessage), pointer :: handshake_msg
      class(AbstractSocket),pointer :: connection

      request_id = this%get_unique_collective_request_id()
      connection => this%get_connection()
      call connection%send(CollectiveStageDataMessage( &
           request_id, &
           collection_id, &
           file_name, &
           var_name, &
           data_reference))

      handshake_msg => connection%receive()
      deallocate(handshake_msg)
      associate (id => request_id)
        ! the put call iSend
        call this%insert_RequestHandle(id, connection%put(id, data_reference))
      end associate
      _RETURN(_SUCCESS)
   end function stage_nondistributed_data

   subroutine shake_hand(this, rc)
      class (ClientThread), intent(inout) :: this
      integer, optional, intent(out) :: rc
      class(AbstractSocket),pointer :: connection

      class (AbstractMessage), pointer :: handshake_msg
      integer :: status

      connection=>this%get_connection()
      call connection%send(HandShakeMessage(),_RC)

      handshake_msg => connection%receive()
      deallocate(handshake_msg)

      _RETURN(_SUCCESS)
   end subroutine shake_hand
   ! Tell server that ClientThread is done making new requests for the
   ! moment.  This allows the server to be more responsive during the
   ! requests phase of operations.
   subroutine done(this, rc)
      class (ClientThread), intent(inout) :: this
      integer, optional, intent(out) :: rc
      class(AbstractSocket),pointer :: connection
      integer :: status

      connection=>this%get_connection()
      call connection%send(DoneMessage(),_RC)
      _RETURN(_SUCCESS)
   end subroutine done

   subroutine done_prefetch(this, rc)
      class (ClientThread), intent(inout) :: this
      integer, optional, intent(out) :: rc
      class(AbstractSocket),pointer :: connection
      integer :: status

      if (this%isEmpty_RequestHandle()) then
        _RETURN(_SUCCESS)
      endif
    
      connection=>this%get_connection()
      call connection%send(PrefetchDoneMessage(),_RC)
      _RETURN(_SUCCESS)
   end subroutine done_prefetch

   subroutine done_collective_prefetch(this, rc)
      class (ClientThread), intent(inout) :: this
      integer, optional, intent(out) :: rc
      class(AbstractSocket),pointer :: connection
      integer :: status
  
      if (this%isEmpty_RequestHandle()) then
        _RETURN(_SUCCESS)
      endif

      connection=>this%get_connection()
      call connection%send(CollectivePrefetchDoneMessage(),_RC)
      _RETURN(_SUCCESS)
   end subroutine done_collective_prefetch

   subroutine done_stage(this, rc)
      class (ClientThread), intent(inout) :: this
      integer, optional, intent(out) :: rc
      class(AbstractSocket),pointer :: connection
      integer :: status

      if (this%isEmpty_RequestHandle()) then
        _RETURN(_SUCCESS)
      endif

      connection=>this%get_connection()
      call connection%send(StageDoneMessage(),_RC)
      _RETURN(_SUCCESS)
   end subroutine done_stage

   subroutine done_collective_stage(this, rc)
      class (ClientThread), intent(inout) :: this
      integer, optional, intent(out) :: rc
      class(AbstractSocket),pointer :: connection
      integer :: status

      if (this%isEmpty_RequestHandle()) then
        _RETURN(_SUCCESS)
      endif
     
      connection=>this%get_connection()
      call connection%send(CollectiveStageDoneMessage(),_RC)
      _RETURN(_SUCCESS)
   end subroutine done_collective_stage

   subroutine wait(this, request_id, rc)
      use pFIO_AbstractRequestHandleMod
      class (ClientThread), target, intent(inout) :: this
      integer, intent(in) :: request_id
      integer, optional, intent(out) :: rc
      integer :: status
      class(AbstractRequestHandle), pointer :: handle

      handle => this%get_RequestHandle(request_id)
      call handle%wait()
      call handle%data_reference%deallocate()
      call this%erase_RequestHandle(request_id)
      _RETURN(_SUCCESS)
  
   end subroutine wait

   subroutine wait_all(this, rc)
      use pFIO_AbstractRequestHandleMod
      class (ClientThread), target, intent(inout) :: this
      integer, optional, intent(out) :: rc
      integer:: status
      call this%clear_RequestHandle(_RC)
      !call this%shake_hand()
      _RETURN(_SUCCESS)

   end subroutine wait_all

   subroutine post_wait_all(this, rc)
      use pFIO_AbstractRequestHandleMod
      class (ClientThread), target, intent(inout) :: this
      integer, optional, intent(out):: rc
      integer :: status
      call this%wait_all(_RC)
      _RETURN(_SUCCESS)
   end subroutine post_wait_all

   integer function get_unique_request_id(this) result(request_id)
      class (ClientThread), intent(inout) :: this

      associate (n => this%request_counter)
        n = MIN_ID + mod(n +1 - MIN_ID, (MAX_ID-MIN_ID+1))
      end associate

      request_id = this%request_counter
   end function get_unique_request_id

   integer function get_unique_collective_request_id(this) result(request_id)
      class (ClientThread), intent(inout) :: this

      associate (n => this%collective_counter)
        n = COLLECTIVE_MIN_ID + mod(n +1 - COLLECTIVE_MIN_ID, (COLLECTIVE_MAX_ID-COLLECTIVE_MIN_ID+1))
      end associate

      request_id = this%collective_counter
   end function get_unique_collective_request_id

   subroutine terminate(this, rc)
      class (ClientThread), intent(inout) :: this
      integer, optional, intent(out) :: rc
      class(AbstractSocket),pointer :: connection
      integer :: status

      connection=>this%get_connection()
      call connection%send(TerminateMessage(),_RC)
      _RETURN(_SUCCESS)
   end subroutine terminate

end module pFIO_ClientThreadMod