From 71059dc783f6476b890bb5175f82df998a70eb77 Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Fri, 13 Sep 2019 13:58:42 +0100 Subject: [PATCH] Version of sphalo for LX_csr. --- base/modules/tools/psb_c_tools_mod.f90 | 12 + base/modules/tools/psb_d_tools_mod.f90 | 12 + base/modules/tools/psb_s_tools_mod.f90 | 12 + base/modules/tools/psb_z_tools_mod.f90 | 12 + base/tools/psb_csphalo.F90 | 541 +++++++++++++++++++++++++ base/tools/psb_dsphalo.F90 | 541 +++++++++++++++++++++++++ base/tools/psb_ssphalo.F90 | 541 +++++++++++++++++++++++++ base/tools/psb_zsphalo.F90 | 541 +++++++++++++++++++++++++ 8 files changed, 2212 insertions(+) diff --git a/base/modules/tools/psb_c_tools_mod.f90 b/base/modules/tools/psb_c_tools_mod.f90 index c190521b..cf838af4 100644 --- a/base/modules/tools/psb_c_tools_mod.f90 +++ b/base/modules/tools/psb_c_tools_mod.f90 @@ -209,6 +209,18 @@ Module psb_c_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_lcsphalo + Subroutine psb_lc_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + import + implicit none + Type(psb_lc_csr_sparse_mat),Intent(in) :: a + Type(psb_lc_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + end Subroutine psb_lc_csr_halo end interface diff --git a/base/modules/tools/psb_d_tools_mod.f90 b/base/modules/tools/psb_d_tools_mod.f90 index ce3b4210..c88795b6 100644 --- a/base/modules/tools/psb_d_tools_mod.f90 +++ b/base/modules/tools/psb_d_tools_mod.f90 @@ -209,6 +209,18 @@ Module psb_d_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_ldsphalo + Subroutine psb_ld_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + import + implicit none + Type(psb_ld_csr_sparse_mat),Intent(in) :: a + Type(psb_ld_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + end Subroutine psb_ld_csr_halo end interface diff --git a/base/modules/tools/psb_s_tools_mod.f90 b/base/modules/tools/psb_s_tools_mod.f90 index 7bc0e791..8a67ed5f 100644 --- a/base/modules/tools/psb_s_tools_mod.f90 +++ b/base/modules/tools/psb_s_tools_mod.f90 @@ -209,6 +209,18 @@ Module psb_s_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_lssphalo + Subroutine psb_ls_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + import + implicit none + Type(psb_ls_csr_sparse_mat),Intent(in) :: a + Type(psb_ls_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + end Subroutine psb_ls_csr_halo end interface diff --git a/base/modules/tools/psb_z_tools_mod.f90 b/base/modules/tools/psb_z_tools_mod.f90 index 1b1ce21e..476dfd02 100644 --- a/base/modules/tools/psb_z_tools_mod.f90 +++ b/base/modules/tools/psb_z_tools_mod.f90 @@ -209,6 +209,18 @@ Module psb_z_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_lzsphalo + Subroutine psb_lz_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + import + implicit none + Type(psb_lz_csr_sparse_mat),Intent(in) :: a + Type(psb_lz_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + end Subroutine psb_lz_csr_halo end interface diff --git a/base/tools/psb_csphalo.F90 b/base/tools/psb_csphalo.F90 index 854d65f4..1e62ebec 100644 --- a/base/tools/psb_csphalo.F90 +++ b/base/tools/psb_csphalo.F90 @@ -55,6 +55,9 @@ ! psb_comm_ext_ use ext_index ! psb_comm_ovrl_ DISABLED for this routine. ! +#undef SP_A2AV_MPI +#undef SP_A2AV_XI +#define SP_A2AV_MAT Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& & rowscale,colscale,outfmt,data) use psb_base_mod, psb_protect_name => psb_csphalo @@ -804,3 +807,541 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_lcsphalo + +Subroutine psb_lc_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + use psb_base_mod, psb_protect_name => psb_lc_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_lc_csr_sparse_mat),Intent(in) :: a + type(psb_lc_csr_sparse_mat),Intent(inout) :: blk + type(psb_desc_type),intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,r,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,data_,totxch,nxs, nxr,& + & err_act, nsnds, nrcvs + integer(psb_lpk_) :: ngtz,irmax,icmax,l1, lnr, lnc, lnnz, ncg, jpx, idx, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_spk_), allocatable :: valsnd(:) + type(psb_lc_coo_sparse_mat), allocatable :: acoo + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + Type(psb_desc_type), pointer :: col_desc_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_lc_csr_sphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + if (present(outcol_glob)) then + outcol_glob_ = outcol_glob + else + outcol_glob_ = .false. + endif + if (present(col_desc)) then + col_desc_ => col_desc + else + col_desc_ => desc_a + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),col_desc_,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_spk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call lc_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call lc_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + ! + ! This seems to be the correct output condition + ! + if (colcnv_.and.(.not.outcol_glob_)) & + & call psb_glob_to_loc(acoo%ja(1:iszr),col_desc_,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine lc_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + complex(psb_spk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: ipdxv(:) + integer(psb_lpk_), intent(in) :: iasnd(:), jasnd(:) + complex(psb_spk_), intent(out) :: valrcv(:) + integer(psb_lpk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpk_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpk_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpk_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_complex_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_c_spk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_complex_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_c_spk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine lc_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine lc_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + complex(psb_spk_), intent(in) :: valsnd(:) + complex(psb_spk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine lc_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_lc_csr_halo diff --git a/base/tools/psb_dsphalo.F90 b/base/tools/psb_dsphalo.F90 index 94f512c7..7efb8244 100644 --- a/base/tools/psb_dsphalo.F90 +++ b/base/tools/psb_dsphalo.F90 @@ -55,6 +55,9 @@ ! psb_comm_ext_ use ext_index ! psb_comm_ovrl_ DISABLED for this routine. ! +#undef SP_A2AV_MPI +#undef SP_A2AV_XI +#define SP_A2AV_MAT Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& & rowscale,colscale,outfmt,data) use psb_base_mod, psb_protect_name => psb_dsphalo @@ -804,3 +807,541 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_ldsphalo + +Subroutine psb_ld_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + use psb_base_mod, psb_protect_name => psb_ld_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_ld_csr_sparse_mat),Intent(in) :: a + type(psb_ld_csr_sparse_mat),Intent(inout) :: blk + type(psb_desc_type),intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,r,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,data_,totxch,nxs, nxr,& + & err_act, nsnds, nrcvs + integer(psb_lpk_) :: ngtz,irmax,icmax,l1, lnr, lnc, lnnz, ncg, jpx, idx, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + real(psb_dpk_), allocatable :: valsnd(:) + type(psb_ld_coo_sparse_mat), allocatable :: acoo + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + Type(psb_desc_type), pointer :: col_desc_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_ld_csr_sphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + if (present(outcol_glob)) then + outcol_glob_ = outcol_glob + else + outcol_glob_ = .false. + endif + if (present(col_desc)) then + col_desc_ => col_desc + else + col_desc_ => desc_a + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),col_desc_,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_dpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call ld_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call ld_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + ! + ! This seems to be the correct output condition + ! + if (colcnv_.and.(.not.outcol_glob_)) & + & call psb_glob_to_loc(acoo%ja(1:iszr),col_desc_,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine ld_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + real(psb_dpk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: ipdxv(:) + integer(psb_lpk_), intent(in) :: iasnd(:), jasnd(:) + real(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_lpk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpk_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpk_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpk_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_double_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_r_dpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_double_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_r_dpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine ld_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine ld_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + real(psb_dpk_), intent(in) :: valsnd(:) + real(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine ld_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_ld_csr_halo diff --git a/base/tools/psb_ssphalo.F90 b/base/tools/psb_ssphalo.F90 index fa200bb0..c5d4e602 100644 --- a/base/tools/psb_ssphalo.F90 +++ b/base/tools/psb_ssphalo.F90 @@ -55,6 +55,9 @@ ! psb_comm_ext_ use ext_index ! psb_comm_ovrl_ DISABLED for this routine. ! +#undef SP_A2AV_MPI +#undef SP_A2AV_XI +#define SP_A2AV_MAT Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& & rowscale,colscale,outfmt,data) use psb_base_mod, psb_protect_name => psb_ssphalo @@ -804,3 +807,541 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_lssphalo + +Subroutine psb_ls_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + use psb_base_mod, psb_protect_name => psb_ls_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_ls_csr_sparse_mat),Intent(in) :: a + type(psb_ls_csr_sparse_mat),Intent(inout) :: blk + type(psb_desc_type),intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,r,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,data_,totxch,nxs, nxr,& + & err_act, nsnds, nrcvs + integer(psb_lpk_) :: ngtz,irmax,icmax,l1, lnr, lnc, lnnz, ncg, jpx, idx, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + real(psb_spk_), allocatable :: valsnd(:) + type(psb_ls_coo_sparse_mat), allocatable :: acoo + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + Type(psb_desc_type), pointer :: col_desc_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_ls_csr_sphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + if (present(outcol_glob)) then + outcol_glob_ = outcol_glob + else + outcol_glob_ = .false. + endif + if (present(col_desc)) then + col_desc_ => col_desc + else + col_desc_ => desc_a + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),col_desc_,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_spk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call ls_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call ls_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + ! + ! This seems to be the correct output condition + ! + if (colcnv_.and.(.not.outcol_glob_)) & + & call psb_glob_to_loc(acoo%ja(1:iszr),col_desc_,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine ls_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + real(psb_spk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: ipdxv(:) + integer(psb_lpk_), intent(in) :: iasnd(:), jasnd(:) + real(psb_spk_), intent(out) :: valrcv(:) + integer(psb_lpk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpk_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpk_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpk_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_real_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_r_spk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_real_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_r_spk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine ls_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine ls_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + real(psb_spk_), intent(in) :: valsnd(:) + real(psb_spk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine ls_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_ls_csr_halo diff --git a/base/tools/psb_zsphalo.F90 b/base/tools/psb_zsphalo.F90 index ced01ba4..b6427ec1 100644 --- a/base/tools/psb_zsphalo.F90 +++ b/base/tools/psb_zsphalo.F90 @@ -55,6 +55,9 @@ ! psb_comm_ext_ use ext_index ! psb_comm_ovrl_ DISABLED for this routine. ! +#undef SP_A2AV_MPI +#undef SP_A2AV_XI +#define SP_A2AV_MAT Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& & rowscale,colscale,outfmt,data) use psb_base_mod, psb_protect_name => psb_zsphalo @@ -804,3 +807,541 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_lzsphalo + +Subroutine psb_lz_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data,outcol_glob,col_desc) + use psb_base_mod, psb_protect_name => psb_lz_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_lz_csr_sparse_mat),Intent(in) :: a + type(psb_lz_csr_sparse_mat),Intent(inout) :: blk + type(psb_desc_type),intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale,outcol_glob + integer(psb_ipk_), intent(in), optional :: data + type(psb_desc_type),Intent(in), optional, target :: col_desc + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,r,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,data_,totxch,nxs, nxr,& + & err_act, nsnds, nrcvs + integer(psb_lpk_) :: ngtz,irmax,icmax,l1, lnr, lnc, lnnz, ncg, jpx, idx, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_dpk_), allocatable :: valsnd(:) + type(psb_lz_coo_sparse_mat), allocatable :: acoo + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + Type(psb_desc_type), pointer :: col_desc_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_lz_csr_sphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + if (present(outcol_glob)) then + outcol_glob_ = outcol_glob + else + outcol_glob_ = .false. + endif + if (present(col_desc)) then + col_desc_ => col_desc + else + col_desc_ => desc_a + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),col_desc_,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_dpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call lz_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call lz_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + ! + ! This seems to be the correct output condition + ! + if (colcnv_.and.(.not.outcol_glob_)) & + & call psb_glob_to_loc(acoo%ja(1:iszr),col_desc_,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine lz_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + complex(psb_dpk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: ipdxv(:) + integer(psb_lpk_), intent(in) :: iasnd(:), jasnd(:) + complex(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_lpk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpk_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpk_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpk_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_dcomplex_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_c_dpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_dcomplex_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_c_dpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_lpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine lz_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine lz_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + complex(psb_dpk_), intent(in) :: valsnd(:) + complex(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine lz_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpk_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_lz_csr_halo