diff --git a/base/modules/tools/psb_c_tools_mod.f90 b/base/modules/tools/psb_c_tools_mod.f90 index 49542ba2..7257f592 100644 --- a/base/modules/tools/psb_c_tools_mod.f90 +++ b/base/modules/tools/psb_c_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_c_tools_mod use psb_desc_mod, only : psb_desc_type, psb_spk_, psb_ipk_, psb_lpk_ use psb_c_vect_mod, only : psb_c_base_vect_type, psb_c_vect_type - use psb_c_mat_mod, only : psb_cspmat_type, psb_c_base_sparse_mat + use psb_c_mat_mod, only : psb_cspmat_type, psb_lcspmat_type, psb_c_base_sparse_mat use psb_l_vect_mod, only : psb_l_vect_type use psb_c_multivect_mod, only : psb_c_base_multivect_type, psb_c_multivect_type @@ -197,6 +197,18 @@ Module psb_c_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_csphalo + Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + import + implicit none + Type(psb_lcspmat_type),Intent(in) :: a + Type(psb_lcspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_lcsphalo end interface diff --git a/base/modules/tools/psb_d_tools_mod.f90 b/base/modules/tools/psb_d_tools_mod.f90 index 7f2dbee2..624a7e07 100644 --- a/base/modules/tools/psb_d_tools_mod.f90 +++ b/base/modules/tools/psb_d_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_d_tools_mod use psb_desc_mod, only : psb_desc_type, psb_dpk_, psb_ipk_, psb_lpk_ use psb_d_vect_mod, only : psb_d_base_vect_type, psb_d_vect_type - use psb_d_mat_mod, only : psb_dspmat_type, psb_d_base_sparse_mat + use psb_d_mat_mod, only : psb_dspmat_type, psb_ldspmat_type, psb_d_base_sparse_mat use psb_l_vect_mod, only : psb_l_vect_type use psb_d_multivect_mod, only : psb_d_base_multivect_type, psb_d_multivect_type @@ -197,6 +197,18 @@ Module psb_d_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_dsphalo + Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + import + implicit none + Type(psb_ldspmat_type),Intent(in) :: a + Type(psb_ldspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_ldsphalo end interface diff --git a/base/modules/tools/psb_s_tools_mod.f90 b/base/modules/tools/psb_s_tools_mod.f90 index 8fd29c04..aaed4bb1 100644 --- a/base/modules/tools/psb_s_tools_mod.f90 +++ b/base/modules/tools/psb_s_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_s_tools_mod use psb_desc_mod, only : psb_desc_type, psb_spk_, psb_ipk_, psb_lpk_ use psb_s_vect_mod, only : psb_s_base_vect_type, psb_s_vect_type - use psb_s_mat_mod, only : psb_sspmat_type, psb_s_base_sparse_mat + use psb_s_mat_mod, only : psb_sspmat_type, psb_lsspmat_type, psb_s_base_sparse_mat use psb_l_vect_mod, only : psb_l_vect_type use psb_s_multivect_mod, only : psb_s_base_multivect_type, psb_s_multivect_type @@ -197,6 +197,18 @@ Module psb_s_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_ssphalo + Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + import + implicit none + Type(psb_lsspmat_type),Intent(in) :: a + Type(psb_lsspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_lssphalo end interface diff --git a/base/modules/tools/psb_z_tools_mod.f90 b/base/modules/tools/psb_z_tools_mod.f90 index 79e630f1..e4648d9f 100644 --- a/base/modules/tools/psb_z_tools_mod.f90 +++ b/base/modules/tools/psb_z_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_z_tools_mod use psb_desc_mod, only : psb_desc_type, psb_dpk_, psb_ipk_, psb_lpk_ use psb_z_vect_mod, only : psb_z_base_vect_type, psb_z_vect_type - use psb_z_mat_mod, only : psb_zspmat_type, psb_z_base_sparse_mat + use psb_z_mat_mod, only : psb_zspmat_type, psb_lzspmat_type, psb_z_base_sparse_mat use psb_l_vect_mod, only : psb_l_vect_type use psb_z_multivect_mod, only : psb_z_base_multivect_type, psb_z_multivect_type @@ -197,6 +197,18 @@ Module psb_z_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_zsphalo + Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + import + implicit none + Type(psb_lzspmat_type),Intent(in) :: a + Type(psb_lzspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_lzsphalo end interface diff --git a/base/tools/psb_csphalo.F90 b/base/tools/psb_csphalo.F90 index 520f0f66..c8f6377c 100644 --- a/base/tools/psb_csphalo.F90 +++ b/base/tools/psb_csphalo.F90 @@ -182,7 +182,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& idxs = 0 idxr = 0 - call acoo%allocate(izero,a%get_ncols(),info) + call acoo%allocate(izero,a%get_ncols()) call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) @@ -467,3 +467,324 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_csphalo + + +Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + use psb_base_mod, psb_protect_name => psb_lcsphalo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_lcspmat_type),Intent(in) :: a + Type(psb_lcspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter, proc, i, & + & n_el_send,n_el_recv,& + & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& + & data_,totxch,nxs, nxr + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_spk_), allocatable :: valsnd(:) + type(psb_lc_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_csphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + if (present(outfmt)) then + outfmt_ = psb_toupper(outfmt) + else + outfmt_ = 'CSR' + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + ipx = 1 + brvindx(ipx) = 0 + bsdindx(ipx) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + lnc = a%get_ncols() + call acoo%allocate(lzero,lnc) + + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract and send/receive. + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + counter = counter+n_el_send+3 + Enddo + + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr=sum(rvsz) + call acoo%reallocate(max(iszr,1)) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + mat_recv = iszr + iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_; ch_err='psb_sp_reall' + call psb_errpush(info,name,a_err=ch_err); goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + + tot_elem=0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv=ipdxv(counter+psb_n_elem_recv_) + counter=counter+n_el_recv + n_el_send=ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_sp_getrow' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + tot_elem=tot_elem+n_elem + Enddo + ipx = ipx + 1 + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_loc_to_glob' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_spk_,icomm,minfo) + call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoallv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psbglob_to_loc' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call move_alloc(acoo,blk%a) + + ! Do we expect any duplicates to appear???? + call blk%cscnv(info,type=outfmt_,dupl=psb_dupl_add_) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spcnv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +End Subroutine psb_lcsphalo diff --git a/base/tools/psb_dsphalo.F90 b/base/tools/psb_dsphalo.F90 index e76d03b3..18fce4e9 100644 --- a/base/tools/psb_dsphalo.F90 +++ b/base/tools/psb_dsphalo.F90 @@ -182,7 +182,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idxs = 0 idxr = 0 - call acoo%allocate(izero,a%get_ncols(),info) + call acoo%allocate(izero,a%get_ncols()) call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) @@ -467,3 +467,324 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_dsphalo + + +Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + use psb_base_mod, psb_protect_name => psb_ldsphalo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_ldspmat_type),Intent(in) :: a + Type(psb_ldspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter, proc, i, & + & n_el_send,n_el_recv,& + & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& + & data_,totxch,nxs, nxr + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + real(psb_dpk_), allocatable :: valsnd(:) + type(psb_ld_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_dsphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + if (present(outfmt)) then + outfmt_ = psb_toupper(outfmt) + else + outfmt_ = 'CSR' + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + ipx = 1 + brvindx(ipx) = 0 + bsdindx(ipx) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + lnc = a%get_ncols() + call acoo%allocate(lzero,lnc) + + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract and send/receive. + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + counter = counter+n_el_send+3 + Enddo + + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr=sum(rvsz) + call acoo%reallocate(max(iszr,1)) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + mat_recv = iszr + iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_; ch_err='psb_sp_reall' + call psb_errpush(info,name,a_err=ch_err); goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + + tot_elem=0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv=ipdxv(counter+psb_n_elem_recv_) + counter=counter+n_el_recv + n_el_send=ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_sp_getrow' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + tot_elem=tot_elem+n_elem + Enddo + ipx = ipx + 1 + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_loc_to_glob' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_dpk_,icomm,minfo) + call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoallv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psbglob_to_loc' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call move_alloc(acoo,blk%a) + + ! Do we expect any duplicates to appear???? + call blk%cscnv(info,type=outfmt_,dupl=psb_dupl_add_) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spcnv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +End Subroutine psb_ldsphalo diff --git a/base/tools/psb_ssphalo.F90 b/base/tools/psb_ssphalo.F90 index 8053e42b..23b6e6ec 100644 --- a/base/tools/psb_ssphalo.F90 +++ b/base/tools/psb_ssphalo.F90 @@ -182,7 +182,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& idxs = 0 idxr = 0 - call acoo%allocate(izero,a%get_ncols(),info) + call acoo%allocate(izero,a%get_ncols()) call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) @@ -467,3 +467,324 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_ssphalo + + +Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + use psb_base_mod, psb_protect_name => psb_lssphalo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_lsspmat_type),Intent(in) :: a + Type(psb_lsspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter, proc, i, & + & n_el_send,n_el_recv,& + & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& + & data_,totxch,nxs, nxr + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + real(psb_spk_), allocatable :: valsnd(:) + type(psb_ls_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_ssphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + if (present(outfmt)) then + outfmt_ = psb_toupper(outfmt) + else + outfmt_ = 'CSR' + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + ipx = 1 + brvindx(ipx) = 0 + bsdindx(ipx) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + lnc = a%get_ncols() + call acoo%allocate(lzero,lnc) + + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract and send/receive. + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + counter = counter+n_el_send+3 + Enddo + + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr=sum(rvsz) + call acoo%reallocate(max(iszr,1)) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + mat_recv = iszr + iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_; ch_err='psb_sp_reall' + call psb_errpush(info,name,a_err=ch_err); goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + + tot_elem=0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv=ipdxv(counter+psb_n_elem_recv_) + counter=counter+n_el_recv + n_el_send=ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_sp_getrow' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + tot_elem=tot_elem+n_elem + Enddo + ipx = ipx + 1 + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_loc_to_glob' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_spk_,icomm,minfo) + call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoallv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psbglob_to_loc' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call move_alloc(acoo,blk%a) + + ! Do we expect any duplicates to appear???? + call blk%cscnv(info,type=outfmt_,dupl=psb_dupl_add_) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spcnv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +End Subroutine psb_lssphalo diff --git a/base/tools/psb_zsphalo.F90 b/base/tools/psb_zsphalo.F90 index 2dffbe1d..25f2c180 100644 --- a/base/tools/psb_zsphalo.F90 +++ b/base/tools/psb_zsphalo.F90 @@ -182,7 +182,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idxs = 0 idxr = 0 - call acoo%allocate(izero,a%get_ncols(),info) + call acoo%allocate(izero,a%get_ncols()) call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) @@ -467,3 +467,324 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& return End Subroutine psb_zsphalo + + +Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,outfmt,data) + use psb_base_mod, psb_protect_name => psb_lzsphalo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_lzspmat_type),Intent(in) :: a + Type(psb_lzspmat_type),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + character(len=5), optional :: outfmt + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter, proc, i, & + & n_el_send,n_el_recv,& + & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& + & data_,totxch,nxs, nxr + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_dpk_), allocatable :: valsnd(:) + type(psb_lz_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_zsphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + if (present(outfmt)) then + outfmt_ = psb_toupper(outfmt) + else + outfmt_ = 'CSR' + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + ipx = 1 + brvindx(ipx) = 0 + bsdindx(ipx) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + lnc = a%get_ncols() + call acoo%allocate(lzero,lnc) + + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract and send/receive. + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + counter = counter+n_el_send+3 + Enddo + + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr=sum(rvsz) + call acoo%reallocate(max(iszr,1)) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + mat_recv = iszr + iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_; ch_err='psb_sp_reall' + call psb_errpush(info,name,a_err=ch_err); goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + + tot_elem=0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv=ipdxv(counter+psb_n_elem_recv_) + counter=counter+n_el_recv + n_el_send=ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_sp_getrow' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + tot_elem=tot_elem+n_elem + Enddo + ipx = ipx + 1 + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_loc_to_glob' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_dpk_,icomm,minfo) + call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='mpi_alltoallv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psbglob_to_loc' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(lzero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call move_alloc(acoo,blk%a) + + ! Do we expect any duplicates to appear???? + call blk%cscnv(info,type=outfmt_,dupl=psb_dupl_add_) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spcnv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +End Subroutine psb_lzsphalo