pfio_ctest_io.F90 Source File


This file depends on

sourcefile~~pfio_ctest_io.f90~~EfferentGraph sourcefile~pfio_ctest_io.f90 pfio_ctest_io.F90 sourcefile~mapl_exceptionhandling.f90 MAPL_ExceptionHandling.F90 sourcefile~pfio_ctest_io.f90->sourcefile~mapl_exceptionhandling.f90 sourcefile~pfio.f90 pFIO.F90 sourcefile~pfio_ctest_io.f90->sourcefile~pfio.f90 sourcefile~pflogger_stub.f90 pflogger_stub.F90 sourcefile~pfio_ctest_io.f90->sourcefile~pflogger_stub.f90

Source Code

!usage
!mpirun -np 8 ./pfio_collective_demo.x -nc 4 -nsi 2 -nso 2  -f1 xxx1.nc4 -f2 xxx2.nc4 -v T -s mpi
!The variable should be 4d with lavel>=20
#include "MAPL_ErrLog.h"
#include "unused_dummy.H"

module ctest_io_CLI
   use MAPL_ExceptionHandling
   use pFIO
   use gFTL_StringVector
   use gFTL_StringIntegerMap
   implicit none
   private

   public :: CommandLineOptions0
   public :: process_command_line
   public :: DirectoryServicePointer
 
   type CommandLineOptions0
      type (StringVector) :: requested_variables

      integer :: npes_client
      integer :: npes_iserver
      integer :: npes_oserver

      integer :: N_ig ! number of isever group
      integer :: N_og ! nuber of  osever group
      integer :: N_writer ! number of writer

      logical :: debug
      character(len=:),allocatable :: server_type ! 'mpi' or 'openmp'
      character(len=:),allocatable :: writer ! 'mpi' or 'openmp'
   end type CommandLineOptions0

   type DirectoryServicePointer
     class(AbstractDirectoryService),pointer :: dsPtr=>null()
   end type

   integer, public :: c1,c2,c3,c4,c0

contains

   ! The following procedure parses the command line to find various
   ! arguments for file names, target grid resolution, etc.
   subroutine process_command_line(options, rc)
      type (CommandLineOptions0), intent(inout) :: options
      integer, optional, intent(out) :: rc

      integer :: n_args
      integer :: i_arg
      character(len=:), allocatable :: argument
      character(len=:), allocatable :: buffer

      n_args = command_argument_count()

      options%N_writer = 0
      options%writer   = ''
      i_arg = 0
      do
         if (i_arg > n_args) exit

         argument = get_next_argument()

         select case (argument)
         case ('-nc', '--npes_client')
            buffer = get_next_argument()
            _ASSERT(buffer /= '-', "no extrea - ")
            read(buffer,*) options%npes_client
         case ('-nsi', '--npes_iserver')
            buffer = get_next_argument()
            _ASSERT(buffer /= '-', "no extrea - ")
            read(buffer,*) options%npes_iserver
         case ('-ngi', '--ng_iserver')
            buffer = get_next_argument()
            _ASSERT(buffer /= '-', "no extrea - ")
            read(buffer,*) options%N_ig
         case ('-nso', '--npes_oserver')
            buffer = get_next_argument()
            _ASSERT(buffer /= '-', "no extrea - ")
            read(buffer,*) options%npes_oserver
         case ('-ngo', '--ng_oserver')
            buffer = get_next_argument()
            _ASSERT(buffer /= '-', "no extrea - ")
            read(buffer,*) options%N_og
         case ('-nw', '--n_writer')
            buffer = get_next_argument()
            _ASSERT(buffer /= '-', "no extrea - ")
            read(buffer,*) options%N_writer
         case ('-w', '--pfio_writer')
            options%writer = get_next_argument()
            _ASSERT(options%server_type /= '-', "no extrea - ")
         case ('-v', '--var')
            buffer = get_next_argument()
            _ASSERT(buffer(1:1) /= '-', "no extrea - ")
            options%requested_variables = parse_vars(buffer)
         case ('-s', '--server_type')
            options%server_type = get_next_argument()
            _ASSERT(options%server_type /= '-', "no extrea - ")
         case ('-d', '--debug')
            options%debug = .true.
         case default
            ! ignore
         end select

      end do
      _RETURN(_SUCCESS)
   contains
      
      function get_next_argument() result(argument)
         character(len=:), allocatable :: argument
         
         integer :: length
         
         i_arg = i_arg + 1
         
         call get_command_argument(i_arg, length=length)
         allocate(character(len=length) :: argument)
         call get_command_argument(i_arg, value=argument)
         
      end function get_next_argument

      function parse_vars(buffer) result(vars)
         type (StringVector) :: vars
         character(len=*), intent(in) :: buffer

         integer :: idx
         character(len=1), parameter :: COMMA = ','
         character(len=:), allocatable :: string

         string = buffer // COMMA
         do
            if (len(string) == 0) exit
            idx = index(string,COMMA)
            call vars%push_back(string(1:idx-1))
            string = string(idx+1:)
         end do

      end function parse_vars


   end subroutine process_command_line
   

end module ctest_io_CLI

module FakeHistData0Mod
   use MAPL_ExceptionHandling
   use ctest_io_CLI
   use pFIO
   use gFTL_StringVector
   use gFTL_StringIntegerMap
   use, intrinsic :: iso_c_binding, only: c_f_pointer, c_loc
   use, intrinsic :: iso_fortran_env, only: REAL32
   implicit none
   private

   public :: FakeHistData0

   type FakeBundle
      real(kind=REAL32), allocatable :: x(:,:,:,:,:)
      integer :: request_id
   end type FakeBundle

   type FakeHistData0
      type (ClientThread) :: i_c
      type (ClientThread) :: o_c
      type (ClientThreadVector) :: ic_vec
      type (ClientThreadVector) :: oc_vec

      integer, allocatable :: hist_collection_ids(:)

      type (StringVector) :: vars
      type (FakeBundle), allocatable :: bundle(:)

      integer :: comm
      integer,allocatable :: comms(:)
      integer :: rank
      integer :: npes

      integer :: Xdim
      integer :: Ydim
      integer :: nf
      integer :: lev
      integer :: time

   contains
      procedure :: init
      procedure :: run
      procedure :: finalize

   end type FakeHistData0

contains
   

   subroutine init(this, options, comms,app_ds, N_iclient_g, N_oclient_g, rc)
      class (FakeHistData0),target, intent(inout) :: this
      type (CommandLineOptions0), intent(in) :: options
      integer, intent(in), dimension(:) :: comms
      class(AbstractDirectoryService), target,intent(inout) :: app_ds
      integer, intent(in) :: N_iclient_g, N_oclient_g
      integer, optional, intent(out) :: rc

      integer :: ierror,k, i,status
      type (FileMetadata) :: file_metadata
      type (NetCDF4_FileFormatter) :: formatter
      type (NetCDF4_FileFormatter) :: test_formatter
      type (FileMetadata) :: test_metadata
      real, target,allocatable :: testValues(:,:,:,:,:)
      real, pointer :: testPtr(:)

      type (StringIntegerMap) :: dims
      class(ClientThread), pointer :: threadPtr=>null()     

      this%ic_vec = ClientThreadVector()
      this%oc_vec = ClientThreadVector()

      do i = 1, N_iclient_g
         allocate(threadPtr, source = ClientThread())
         call app_ds%connect_to_server('iserver',threadPtr, comms(i), rc=status)
         _VERIFY(status)
         !call threadPtr%init_connection(app_ds(i)%dsPtr, comms(i),'i_server')
         call this%ic_vec%push_back(threadPtr)
         nullify(threadPtr)
      enddo

      do i = 1 + N_iclient_g, N_iclient_g + N_oclient_g
         allocate(threadPtr, source = ClientThread())
         call app_ds%connect_to_server('oserver',threadPtr, comms(i), rc=status)
         _VERIFY(status)
         !call threadPtr%init_connection(app_ds(i)%dsPtr, comms(i),'o_server')
         call this%oc_vec%push_back(threadPtr)
         nullify(threadPtr)
      enddo

      !this%i_c = ClientThread()
      !call this%i_c%init_connection(app_ds(1)%dsPtr,comms(1),'i_server')

      !this%o_c = ClientThread()
      !call this%o_c%init_connection(app_ds(2)%dsPtr,comms(2),'o_server')

      this%vars = options%requested_variables

      this%comm = comms(1)
      allocate(this%comms, source = comms)
      call MPI_Comm_rank(this%comms(1),this%rank,ierror)
      call MPI_Comm_size(this%comms(1),this%npes,ierror)

      allocate(this%bundle(this%vars%size()))
      allocate(this%hist_collection_ids(10))
  
      if ( this%rank == 0) then
         call test_metadata%add_dimension('Xdim',48)
         call test_metadata%add_dimension('Ydim',48)
         call test_metadata%add_dimension('nf',6)
         call test_metadata%add_dimension('lev',72)
         call test_metadata%add_dimension('time',1)
 
         do k = 1, this%vars%size()
            call test_metadata%add_variable(this%vars%at(k),Variable(type=pFIO_REAL32, dimensions='Xdim,Ydim,nf,lev,time'), rc=status)
            _VERIFY(status)
         enddo

         call test_formatter%create('test_in.nc4', rc=status)
         _VERIFY(status)
         call test_formatter%write(test_metadata, rc=status)
         _VERIFY(status)

         allocate(testValues(48,48,6,72,1))

         do k = 1, this%vars%size()
            do i = 1,6
               testValues(:,:,i,:,:) = 1.0*i*10**k
            enddo
            call c_f_pointer(c_loc(testValues), testPtr,[48*48*72*6*1])
            call test_formatter%put_var(this%vars%at(k),testPtr, start=[1,1,1,1,1], count = [48,48,6,72,1])

         enddo
         call test_formatter%close(rc=status)
      endif

      call Mpi_Barrier(this%comms(1),ierror)

      call formatter%open('test_in.nc4', pFIO_READ)
      file_metadata = formatter%read()
      call formatter%close()

      dims = file_metadata%get_dimensions()

      this%Xdim = dims%at('Xdim')
      this%Ydim = dims%at('Ydim')
      this%nf   = dims%at('nf')
      this%lev  = dims%at('lev')
      this%time = 1

   end subroutine init

   subroutine run(this, step, rc)
      class (FakeHistData0), target, intent(inout) :: this
      integer, intent(in) :: step
      integer, optional, intent(out) :: rc

      type(ArrayReference) :: ref
      type(FileMetadata) :: fmd
      Type(Variable) :: T

      integer :: i_var,i,md_id
      integer :: collection_id, file_md_id, collection_num
      integer,allocatable :: prefetch_ids(:)
      integer,allocatable :: stage_ids(:,:)
      integer :: nx, nf, width, k, ith, jth, Xdim0, Xdim1, Ydim0, Ydim1
      integer :: status

      class(ClientThread), pointer :: icPtr=>null()     
      class(ClientThread), pointer :: ocPtr=>null()     


      i = mod(this%npes, 6)
      if(i /=0 ) then; print*, " make sure the number of reading cores  is multiple of 6"; stop 1;endif

      nx = nint(sqrt(this%npes/6.))
      
      if( nx*nx*6 /= this%npes ) then; print*, " make sure 6*M^2 cores to read"; stop 1; endif

      width = this%Xdim/nx

      nf  = this%rank/(nx*nx) + 1    ! nf th face

      k   = this%rank - (nf-1)*nx*nx ! rank within the face
      ith = k/nx
      jth = mod(k,nx)

      Xdim0 = 1 + ith*width
      Xdim1 = (ith+1)*width
      Ydim0 = 1 + jth*width
      Ydim1 = (jth+1)*width

      ! Establish the collection
      ! In a real use case the collection name would be the ExtData template.
      ! But the actual name does not matter - it is just used to identify
      ! a group of files that have identical metadata (except for time)
      !num_request = 1000

      ! get the input first

      icPtr => this%ic_vec%at(1)
      collection_id = icPtr%add_ext_collection('collection-i')
      !collection_id = this%i_c%add_ext_collection('collection-i')

      allocate(prefetch_ids(this%vars%size()))

      select case (step)
      case (1) ! read the file


         call system_clock(c0)
         do i_var = 1, this%vars%size()
            allocate(this%bundle(i_var)%x(Xdim0:Xdim1,Ydim0:Ydim1,nf:nf,this%lev,1))
            ref = ArrayReference(this%bundle(i_var)%x)
            prefetch_ids(i_var) = &
                 & icPtr%collective_prefetch_data(collection_id,'test_in.nc4', this%vars%at(i_var), ref,&
                 & start=[Xdim0,Ydim0,nf,1,1], &
                 & global_start=[1,1,1,1,1],global_count=[this%Xdim,this%Ydim,this%nf, this%lev,1], rc=status)
            _VERIFY(status)

         end do
         call icPtr%done_collective_prefetch()
         call icPtr%wait_all()

         !do i_var = 1, this%vars%size()
         !   call icPtr%wait(prefetch_ids(i_var))
         !end do

      case (2) ! history out

         call system_clock(c1)
         call fmd%add_dimension('Xdim',this%Xdim)
         call fmd%add_dimension('Ydim',this%Ydim)
         call fmd%add_dimension('lev',this%lev)
         call fmd%add_dimension('nf',this%nf)
         call fmd%add_dimension('time',1)

         T = Variable(type=pFIO_REAL32, dimensions='Xdim,Ydim,nf,lev,time')

         do i_var = 1, this%vars%size() 
            call fmd%add_variable( this%vars%at(i_var),T, rc=status)
            _VERIFY(status)
         enddo

         ocPtr=> this%oc_vec%at(1)
         this%hist_collection_ids(1) = ocPtr%add_hist_collection(fmd)
         this%hist_collection_ids(2) = ocPtr%add_hist_collection(fmd)

         !this%hist_collection_ids(1) = this%o_c%add_hist_collection(fmd)
         collection_num = 2
         allocate(stage_ids(this%vars%size(),collection_num))

         do md_id = 1, collection_num
            ! writing
            file_md_id = this%hist_collection_ids(md_id) 
            do i_var = 1, this%vars%size()
               ref = ArrayReference(this%bundle(i_var)%x)
               stage_ids(i_var,md_id) = &
                 & ocPtr%collective_stage_data(file_md_id, 'test_out'//i_to_string(md_id)//'.nc4', this%vars%at(i_var), ref,&
                 & start=[Xdim0,Ydim0,nf,1, 1], &
                 & global_start=[1,1,1,1,1],global_count=[this%Xdim,this%Ydim, this%nf, this%lev,1], rc=status)
               _VERIFY(status)
            end do
         enddo

         call ocPtr%done_collective_stage()
         call ocPtr%wait_all()
         !call this%o_c%done()
         !do md_id = 1, collection_num
         !   do i_var = 1, this%vars%size()
         !      call ocPtr%wait(stage_ids(i_var,md_id))
         !   enddo
         !end do
         call system_clock(c2)

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1         
      end select
      _RETURN(_SUCCESS)      
   end subroutine run


   subroutine finalize(this)
      class (FakeHistData0), intent(inout) :: this
      integer :: ierror
      class(ClientThread), pointer :: icPtr=>null()
      class(ClientThread), pointer :: ocPtr=>null()
      deallocate(this%bundle)
      call Mpi_Barrier(this%comms(1),ierror)
      print*,"iclient sent terminate signal" 
      icPtr=>this%ic_vec%at(1)
      call icPtr%terminate()
     ! call this%i_c%terminate()
      call Mpi_Barrier(this%comms(2),ierror)
      ocPtr=>this%oc_vec%at(1)
      call ocPtr%terminate()
      print*,"oclient sent terminate signal" 
      !call this%o_c%terminate()
   end subroutine finalize

end module FakeHistData0Mod

program main
   use, intrinsic :: iso_fortran_env, only: REAL32
   use mpi
   use pFIO
   use ctest_io_CLI
   use MAPL_ExceptionHandling
   use FakeHistData0Mod
   use pFlogger, only: pflogger_init => initialize
   implicit none

   integer :: rank, npes, ierror, provided,required
   integer :: status, color, key

   class(BaseServer), target, allocatable :: iserver,oserver
   class(AbstractDirectoryService), allocatable, target :: directory_service

   type (CommandLineOptions0) :: options
   integer, parameter :: NO_COLOR     = 0
   integer, parameter :: iSERVER_COLOR = 1
   integer, parameter :: oSERVER_COLOR = 4
   integer, parameter :: CLIENT_COLOR  = 2
   integer, parameter :: BOTH_COLOR    = 3

   type (FakeHistData0), target :: HistData

   integer :: my_comm_world, my_iComm, my_oComm, my_appcomm

   integer :: client_start, low_rank,up_rank
   integer :: i,k, size_iclient, size_oclient
   integer :: app_start_rank, app_end_rank
   character(len = 20) :: out_file
   character(len = 100):: cmd
   integer :: N_iclient_group, N_oclient_group,N_groups
   integer,allocatable :: local_comm_world(:), app_comms(:)
   integer :: md_id, exit_code
   
   required = MPI_THREAD_MULTIPLE
   !call MPI_init_thread(required, provided, ierror)
   call MPI_init(ierror)
   call MPI_Comm_rank(MPI_COMM_WORLD, rank, ierror)
   call MPI_Comm_size(MPI_COMM_WORLD, npes, ierror)

   call process_command_line(options, rc=status)
   ! split comm_world to local_world

   N_iclient_group = options%N_ig
   N_oclient_group = options%N_og

   N_groups = N_iclient_group + N_oclient_group
 
   allocate(local_comm_world(N_groups))
   allocate(app_comms(N_groups))

   key = 0

   if (options%server_type == 'simple') then
      options%npes_iserver = npes
      options%npes_oserver = npes
      N_iclient_group = 1
      N_oclient_group = 1
      size_iclient = 0
      client_start = 0
      app_start_rank = 0
      app_end_rank = npes-1
   else if (options%server_type == 'mpi' .or. &
            options%server_type == 'multilayer' .or. &
            options%server_type == 'multicomm'  .or. &
            options%server_type == 'multigroup' ) then 
      size_iclient = N_iclient_group*options%npes_iserver
      size_oclient = N_oclient_group*options%npes_oserver
      client_start = npes - size_iclient-size_oclient
      app_start_rank = 0
      app_end_rank   = npes - size_iclient-size_oclient -1

  elseif(options%server_type == 'hybrid') then
      options%npes_iserver = npes - options%npes_oserver
      N_iclient_group = 1
      N_oclient_group = 1
      size_iclient =  N_iclient_group*options%npes_iserver
      client_start = 0
      app_start_rank = 0
      app_end_rank   = size_iclient - 1 
   endif

   directory_service = DirectoryService(MPI_COMM_WORLD)

   ! app + icilent comm
   my_icomm = MPI_COMM_NULL
   my_appcomm = MPI_COMM_NULL

   call pflogger_init()
   do i = 1, N_iclient_group
      low_rank = client_start + (i-1) * options%npes_iserver
      up_rank  = client_start + i*options%npes_iserver
      color = MPI_UNDEFINED
      if (( app_start_rank<= rank .and. rank <= app_end_rank ) .or. ( low_rank <= rank .and. rank < up_rank) ) then
        color = 1
      endif

      call MPI_comm_split(MPI_COMM_WORLD,color,key,local_comm_world(i), ierror)

      if (low_rank <= rank .and. rank < up_rank)  then
         my_comm_world = local_comm_world(i)
      endif

      color = MPI_UNDEFINED
      if ( app_start_rank<= rank .and. rank <= app_end_rank ) then
         color = 1
      endif
      if (low_rank <= rank .and. rank < up_rank)  then
         color = 2
      endif

      app_comms(i) = MPI_COMM_NULL
      if (local_comm_world(i) /= MPI_COMM_NULL) then 
         call MPI_comm_split(local_comm_world(i),color,key,app_comms(i), ierror)
      endif
      
      if ( app_start_rank<= rank .and. rank <= app_end_rank ) then
         my_appcomm = app_comms(i)
      endif

      if (low_rank <= rank .and. rank < up_rank) then
         my_icomm = app_comms(i)
      endif

   enddo
 
   ! app + ocilent comm
   my_ocomm = MPI_COMM_NULL
   do k = 1, N_oclient_group
      i = k + N_iclient_group
      low_rank = client_start+size_iclient + (k-1) * options%npes_oserver
      up_rank  = client_start+size_iclient + k*options%npes_oserver

      color = MPI_UNDEFINED

      if (( app_start_rank<= rank .and. rank <= app_end_rank ) .or. ( low_rank <= rank .and. rank < up_rank) ) then
        color = i
       endif

      call MPI_comm_split(MPI_COMM_WORLD,color,key,local_comm_world(i), ierror)

      if ( low_rank <= rank .and. rank < up_rank)  then
         my_comm_world = local_comm_world(i)
      endif

      color = MPI_UNDEFINED
      if ( app_start_rank<= rank .and. rank <= app_end_rank ) then
        color = i
      endif
      if (low_rank <= rank .and. rank < up_rank) then
        color = i+1
      endif

      app_comms(i) = MPI_COMM_NULL
      if (local_comm_world(i) /= MPI_COMM_NULL) then 
         call MPI_comm_split(local_comm_world(i),color,key,app_comms(i), ierror)
      endif
      
      if (low_rank <= rank .and. rank < up_rank) then
        my_ocomm = app_comms(i)
      endif
   enddo

   if (my_icomm /= MPI_COMM_NULL) then  
      allocate(iserver, source = MpiServer(my_icomm, 'iserver'))
      call directory_service%publish(PortInfo('iserver',iserver), iserver, rc=status)
      if( my_appcomm == MPI_COMM_NULL) then ! mpi server
         call directory_service%connect_to_client('iserver', iserver, rc=status)
         call iserver%start()
      endif
   endif

   if( my_ocomm /= MPI_COMM_NULl) then
     
      if (trim(options%server_type) == 'mpi' .or. &
          trim(options%server_type) == 'simple' .or. &
          trim(options%server_type) == 'hybrid' ) then 
        allocate(oserver, source = MpiServer(my_ocomm, 'oserver'))
      else if (trim(options%server_type) == 'multilayer') then
         allocate(oserver, source = MultiLayerServer(my_ocomm, 'oserver', &
               options%n_writer, options%writer))
      else if (trim(options%server_type) == 'multicomm') then
         allocate(oserver, source = MultiCommServer(my_ocomm, 'oserver', &
               options%n_writer))
      else if (trim(options%server_type) == 'multigroup') then
         allocate(oserver, source = MultiGroupServer(my_ocomm, 'oserver', &
               options%n_writer))
      endif

      call directory_service%publish(PortInfo('oserver',oserver), oserver, rc=status)
      if (my_appcomm == MPI_COMM_NULL) then 
         call directory_service%connect_to_client('oserver', oserver, rc=status)
         call oserver%start()
      endif
   endif

   if ( my_appcomm /= MPI_COMM_NULL) then
      
      print*,"start app rank:", rank

      call histData%init(options,app_comms,directory_service, N_iclient_group, N_oclient_group)
      call histData%run(step=1)
      call histData%run(step=2)
      call histData%finalize()

   end if
  
   call Mpi_Barrier(MPI_COMM_WORLD,ierror)
   call system_clock(c3)

   call directory_service%free_directory_resources()
   call Mpi_Barrier(MPI_COMM_WORLD,ierror)
   exit_code = 0
   if (rank == 0) then
      do md_id = 1, 2
         cmd=''
         out_file = 'test_out'//i_to_string(md_id)//'.nc4'
         cmd = '/usr/bin/diff test_in.nc4 '//trim(out_file)
         print*, "execute_command_line: ", cmd
         call execute_command_line(trim(cmd), exitstat = status )
         if (status == 0) then
            print*, 'test_in.nc4 and '//trim(out_file)//' are the same and thus removed'
            call execute_command_line('/bin/rm -f '//trim(out_file))
         else
            print*, 'test_in.nc4 and '//trim(out_file)//' differ'
            exit_code = 1
         endif
      enddo
      call execute_command_line('/bin/rm -f test_in.nc4')
   endif
   call MPI_finalize(ierror)
   if ( exit_code == 0) stop 0
   if ( exit_code == 1) stop 1
end program main