diff --git a/base/modules/tools/psb_c_tools_mod.f90 b/base/modules/tools/psb_c_tools_mod.f90 index 8fc0a9d6..9be63a39 100644 --- a/base/modules/tools/psb_c_tools_mod.f90 +++ b/base/modules/tools/psb_c_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_c_tools_mod use psb_desc_mod, only : psb_desc_type, psb_spk_, psb_ipk_ use psb_c_vect_mod, only : psb_c_base_vect_type, psb_c_vect_type, psb_i_vect_type - use psb_c_mat_mod, only : psb_cspmat_type, psb_c_base_sparse_mat + use psb_c_mat_mod, only : psb_cspmat_type, psb_c_base_sparse_mat, psb_c_csr_sparse_mat use psb_c_multivect_mod, only : psb_c_base_multivect_type, psb_c_multivect_type interface psb_geall @@ -266,6 +266,17 @@ Module psb_c_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_csphalo + Subroutine psb_c_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + import + implicit none + Type(psb_c_csr_sparse_mat),Intent(in) :: a + Type(psb_c_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_c_csr_halo end interface diff --git a/base/modules/tools/psb_d_tools_mod.f90 b/base/modules/tools/psb_d_tools_mod.f90 index 896d0502..4087ff94 100644 --- a/base/modules/tools/psb_d_tools_mod.f90 +++ b/base/modules/tools/psb_d_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_d_tools_mod use psb_desc_mod, only : psb_desc_type, psb_dpk_, psb_ipk_ use psb_d_vect_mod, only : psb_d_base_vect_type, psb_d_vect_type, psb_i_vect_type - use psb_d_mat_mod, only : psb_dspmat_type, psb_d_base_sparse_mat + use psb_d_mat_mod, only : psb_dspmat_type, psb_d_base_sparse_mat, psb_d_csr_sparse_mat use psb_d_multivect_mod, only : psb_d_base_multivect_type, psb_d_multivect_type interface psb_geall @@ -266,6 +266,17 @@ Module psb_d_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_dsphalo + Subroutine psb_d_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + import + implicit none + Type(psb_d_csr_sparse_mat),Intent(in) :: a + Type(psb_d_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_d_csr_halo end interface diff --git a/base/modules/tools/psb_s_tools_mod.f90 b/base/modules/tools/psb_s_tools_mod.f90 index ace77a25..967fbe4b 100644 --- a/base/modules/tools/psb_s_tools_mod.f90 +++ b/base/modules/tools/psb_s_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_s_tools_mod use psb_desc_mod, only : psb_desc_type, psb_spk_, psb_ipk_ use psb_s_vect_mod, only : psb_s_base_vect_type, psb_s_vect_type, psb_i_vect_type - use psb_s_mat_mod, only : psb_sspmat_type, psb_s_base_sparse_mat + use psb_s_mat_mod, only : psb_sspmat_type, psb_s_base_sparse_mat, psb_s_csr_sparse_mat use psb_s_multivect_mod, only : psb_s_base_multivect_type, psb_s_multivect_type interface psb_geall @@ -266,6 +266,17 @@ Module psb_s_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_ssphalo + Subroutine psb_s_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + import + implicit none + Type(psb_s_csr_sparse_mat),Intent(in) :: a + Type(psb_s_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_s_csr_halo end interface diff --git a/base/modules/tools/psb_z_tools_mod.f90 b/base/modules/tools/psb_z_tools_mod.f90 index 4479fdc1..b2502389 100644 --- a/base/modules/tools/psb_z_tools_mod.f90 +++ b/base/modules/tools/psb_z_tools_mod.f90 @@ -32,7 +32,7 @@ Module psb_z_tools_mod use psb_desc_mod, only : psb_desc_type, psb_dpk_, psb_ipk_ use psb_z_vect_mod, only : psb_z_base_vect_type, psb_z_vect_type, psb_i_vect_type - use psb_z_mat_mod, only : psb_zspmat_type, psb_z_base_sparse_mat + use psb_z_mat_mod, only : psb_zspmat_type, psb_z_base_sparse_mat, psb_z_csr_sparse_mat use psb_z_multivect_mod, only : psb_z_base_multivect_type, psb_z_multivect_type interface psb_geall @@ -266,6 +266,17 @@ Module psb_z_tools_mod character(len=5), optional :: outfmt integer(psb_ipk_), intent(in), optional :: data end Subroutine psb_zsphalo + Subroutine psb_z_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + import + implicit none + Type(psb_z_csr_sparse_mat),Intent(in) :: a + Type(psb_z_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + end Subroutine psb_z_csr_halo end interface diff --git a/base/tools/psb_csphalo.F90 b/base/tools/psb_csphalo.F90 index ee74401e..7de835a7 100644 --- a/base/tools/psb_csphalo.F90 +++ b/base/tools/psb_csphalo.F90 @@ -588,3 +588,525 @@ complex(psb_spk_), intent(out) :: valrcv(:) #endif #endif End Subroutine psb_csphalo + + +Subroutine psb_c_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + use psb_base_mod, psb_protect_name => psb_c_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_c_csr_sparse_mat),Intent(in) :: a + Type(psb_c_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,idx, r, tot_elem,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz, ncg, jpx + integer(psb_mpik_) :: icomm, minfo + integer(psb_mpik_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_ipk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_spk_), allocatable :: valsnd(:) + type(psb_c_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_csphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_def_integer,& + & rvsz,1,psb_mpi_def_integer,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_spk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ja,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call c_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call c_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(izero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine c_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif +complex(psb_spk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: iasnd(:), jasnd(:),ipdxv(:) +complex(psb_spk_), intent(out) :: valrcv(:) + integer(psb_ipk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpik_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpik_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_complex_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_c_spk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_complex_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_c_spk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine c_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine c_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + complex(psb_spk_), intent(in) :: valsnd(:) + complex(psb_spk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine c_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_c_csr_halo diff --git a/base/tools/psb_dsphalo.F90 b/base/tools/psb_dsphalo.F90 index 87fb46d7..98c29144 100644 --- a/base/tools/psb_dsphalo.F90 +++ b/base/tools/psb_dsphalo.F90 @@ -588,3 +588,525 @@ real(psb_dpk_), intent(out) :: valrcv(:) #endif #endif End Subroutine psb_dsphalo + + +Subroutine psb_d_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + use psb_base_mod, psb_protect_name => psb_d_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_d_csr_sparse_mat),Intent(in) :: a + Type(psb_d_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,idx, r, tot_elem,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz, ncg, jpx + integer(psb_mpik_) :: icomm, minfo + integer(psb_mpik_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_ipk_), allocatable :: iasnd(:), jasnd(:) + real(psb_dpk_), allocatable :: valsnd(:) + type(psb_d_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_dsphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_def_integer,& + & rvsz,1,psb_mpi_def_integer,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_dpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ja,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call d_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call d_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(izero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine d_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif +real(psb_dpk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: iasnd(:), jasnd(:),ipdxv(:) +real(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_ipk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpik_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpik_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_double_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_r_dpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_double_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_r_dpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine d_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine d_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + real(psb_dpk_), intent(in) :: valsnd(:) + real(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine d_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_d_csr_halo diff --git a/base/tools/psb_ssphalo.F90 b/base/tools/psb_ssphalo.F90 index e0f134b5..b54d19fd 100644 --- a/base/tools/psb_ssphalo.F90 +++ b/base/tools/psb_ssphalo.F90 @@ -588,3 +588,525 @@ real(psb_spk_), intent(out) :: valrcv(:) #endif #endif End Subroutine psb_ssphalo + + +Subroutine psb_s_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + use psb_base_mod, psb_protect_name => psb_s_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_s_csr_sparse_mat),Intent(in) :: a + Type(psb_s_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,idx, r, tot_elem,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz, ncg, jpx + integer(psb_mpik_) :: icomm, minfo + integer(psb_mpik_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_ipk_), allocatable :: iasnd(:), jasnd(:) + real(psb_spk_), allocatable :: valsnd(:) + type(psb_s_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_ssphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_def_integer,& + & rvsz,1,psb_mpi_def_integer,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_spk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ja,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call s_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call s_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(izero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine s_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif +real(psb_spk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: iasnd(:), jasnd(:),ipdxv(:) +real(psb_spk_), intent(out) :: valrcv(:) + integer(psb_ipk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpik_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpik_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_real_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_r_spk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_real_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_r_spk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine s_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine s_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + real(psb_spk_), intent(in) :: valsnd(:) + real(psb_spk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine s_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_s_csr_halo diff --git a/base/tools/psb_zsphalo.F90 b/base/tools/psb_zsphalo.F90 index 6a85cbe4..e5f321dd 100644 --- a/base/tools/psb_zsphalo.F90 +++ b/base/tools/psb_zsphalo.F90 @@ -588,3 +588,525 @@ complex(psb_dpk_), intent(out) :: valrcv(:) #endif #endif End Subroutine psb_zsphalo + + +Subroutine psb_z_csr_halo(a,desc_a,blk,info,rowcnv,colcnv,& + & rowscale,colscale,data) + use psb_base_mod, psb_protect_name => psb_z_csr_halo + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + Type(psb_z_csr_sparse_mat),Intent(in) :: a + Type(psb_z_csr_sparse_mat),Intent(inout) :: blk + Type(psb_desc_type),Intent(in), target :: desc_a + integer(psb_ipk_), intent(out) :: info + logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale + integer(psb_ipk_), intent(in), optional :: data + ! ...local scalars.... + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc,i, & + & n_el_send,k,n_el_recv,idx, r, tot_elem,& + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz, ncg, jpx + integer(psb_mpik_) :: icomm, minfo + integer(psb_mpik_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:) + integer(psb_ipk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_dpk_), allocatable :: valsnd(:) + type(psb_z_coo_sparse_mat), allocatable :: acoo + integer(psb_ipk_), pointer :: idxv(:) + class(psb_i_base_vect_type), pointer :: pdxv + integer(psb_ipk_), allocatable :: ipdxv(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err + + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='psb_zsphalo' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(rowcnv)) then + rowcnv_ = rowcnv + else + rowcnv_ = .true. + endif + if (present(colcnv)) then + colcnv_ = colcnv + else + colcnv_ = .true. + endif + if (present(rowscale)) then + rowscale_ = rowscale + else + rowscale_ = .false. + endif + if (present(colscale)) then + colscale_ = colscale + else + colscale_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + If (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Data selector',data_ + select case(data_) + case(psb_comm_halo_,psb_comm_ext_ ) + ! Do not accept OVRLAP_INDEX any longer. + case default + call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector') + goto 9999 + end select + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) + ipdxv = pdxv%get_vect() + ! For all rows in the halo descriptor, extract the row size + lnr = 0 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + tot_elem = 0 + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + tot_elem = tot_elem+n_elem + Enddo + sdsz(proc+1) = tot_elem + lnr = lnr + n_el_recv + counter = counter+n_el_send+3 + Enddo + + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! + call mpi_alltoall(sdsz,1,psb_mpi_def_integer,& + & rvsz,1,psb_mpi_def_integer,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + counter = counter+n_el_send+3 + Enddo + + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + l1 = 0 + ipx = 1 + counter=1 + idx = 0 + ! + ! Make sure to get all columns in csget. + ! This is necessary when sphalo is used to compute a transpose, + ! as opposed to just gathering halo for spspmm purposes. + ! + ncg = huge(ncg) + tot_elem = 0 + Do + proc = ipdxv(counter) + if (proc == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + + Do j=0,n_el_send-1 + idx = ipdxv(counter+psb_elem_send_+j) + n_elem = a%get_nz_row(idx) + call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem,jmax=ncg) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_sp_getrow') + goto 9999 + end if + tot_elem = tot_elem+ngtz + Enddo + counter = counter+n_el_send+3 + Enddo + nz = tot_elem + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Going for alltoallv',iszs,iszr + if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_loc_to_glob') + goto 9999 + end if + +#if defined(SP_A2AV_MPI) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_dpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& + & acoo%ja,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) + if (minfo /= mpi_success) info = minfo +#elif defined(SP_A2AV_XI) + call z_my_a2av(valsnd,sdsz,bsdindx,& + & acoo%val,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(iasnd,sdsz,bsdindx,& + & acoo%ia,rvsz,brvindx,ictxt,info) + if (info == psb_success_) call i_my_a2av(jasnd,sdsz,bsdindx,& + & acoo%ja,rvsz,brvindx,ictxt,info) +#elif defined(SP_A2AV_MAT) + call z_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ipdxv,ictxt,icomm,info) +#else + choke on me @! +#endif + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + ! + ! Convert into local numbering + ! + if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I') + if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I') + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psbglob_to_loc') + goto 9999 + end if + + l1 = 0 + call acoo%set_nrows(izero) + ! + irmin = huge(irmin) + icmin = huge(icmin) + irmax = 0 + icmax = 0 + Do i=1,iszr + r=(acoo%ia(i)) + k=(acoo%ja(i)) + ! Just in case some of the conversions were out-of-range + If ((r>0).and.(k>0)) Then + l1=l1+1 + acoo%val(l1) = acoo%val(i) + acoo%ia(l1) = r + acoo%ja(l1) = k + irmin = min(irmin,r) + irmax = max(irmax,r) + icmin = min(icmin,k) + icmax = max(icmax,k) + End If + Enddo + if (rowscale_) then + call acoo%set_nrows(max(irmax-irmin+1,0)) + acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1 + else + call acoo%set_nrows(irmax) + end if + if (colscale_) then + call acoo%set_ncols(max(icmax-icmin+1,0)) + acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1 + else + call acoo%set_ncols(icmax) + end if + + call acoo%set_nzeros(l1) + call acoo%set_sorted(.false.) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),& + & ': End data exchange',counter,l1 + + call acoo%fix(info) + if (info == psb_success_) call acoo%mv_to_fmt(blk,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='psb_spcnv') + goto 9999 + end if + + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +#if defined(SP_A2AV_XI) || defined(SP_A2AV_MAT) +contains + +#if defined(SP_A2AV_MAT) + subroutine z_coo_my_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & valrcv,iarcv,jarcv,rvsz,brvindx,ipdxv,ictxt,icomm,info) + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif +complex(psb_dpk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(in) :: iasnd(:), jasnd(:),ipdxv(:) +complex(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_ipk_), intent(out) :: iarcv(:), jarcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_mpik_), intent(in) :: icomm + integer(psb_ipk_), intent(out) :: info + !Local variables + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz, counter + integer(psb_mpik_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable :: prcid(:), rvhd(:,:) + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = brvindx(ip+1) + p2ptag = psb_dcomplex_swap_tag + call mpi_irecv(valrcv(idx+1:idx+sz),sz,& + & psb_mpi_c_dpk_,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,1),iret) + p2ptag = psb_int_swap_tag + call mpi_irecv(iarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,2),iret) + call mpi_irecv(jarcv(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,rvhd(ip+1,3),iret) + end if + counter = counter+n_el_send+3 + Enddo + + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + if (prcid(ip+1)<0) call psb_get_rank(prcid(ip+1),ictxt,ip) + sz = sdsz(ip+1) + if (sz > 0) then + idx = bsdindx(ip+1) + p2ptag = psb_dcomplex_swap_tag + call mpi_send(valsnd(idx+1:idx+sz),sz,& + & psb_mpi_c_dpk_,prcid(ip+1),& + & p2ptag, icomm,iret) + p2ptag = psb_int_swap_tag + call mpi_send(iasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + call mpi_send(jasnd(idx+1:idx+sz),sz,& + & psb_mpi_ipk_integer,prcid(ip+1),& + & p2ptag, icomm,iret) + end if + counter = counter+n_el_send+3 + Enddo + + counter=1 + Do + ip=ipdxv(counter) + if (ip == -1) exit + n_el_recv = ipdxv(counter+psb_n_elem_recv_) + counter = counter+n_el_recv + n_el_send = ipdxv(counter+psb_n_elem_send_) + sz = rvsz(ip+1) + if (sz > 0) then + call mpi_wait(rvhd(ip+1,1),p2pstat,iret) + call mpi_wait(rvhd(ip+1,2),p2pstat,iret) + call mpi_wait(rvhd(ip+1,3),p2pstat,iret) + end if + counter = counter+n_el_send+3 + Enddo + + + end subroutine z_coo_my_a2av +#endif +#if defined(SP_A2AV_XI) + subroutine z_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + complex(psb_dpk_), intent(in) :: valsnd(:) + complex(psb_dpk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine z_my_a2av + + subroutine i_my_a2av(valsnd,sdsz,bsdindx,& + & valrcv,rvsz,brvindx,ictxt,info) + integer(psb_ipk_), intent(in) :: valsnd(:) + integer(psb_ipk_), intent(out) :: valrcv(:) + integer(psb_mpik_), intent(in) :: bsdindx(:), brvindx(:), sdsz(:), rvsz(:) + integer(psb_ipk_), intent(in) :: ictxt + integer(psb_ipk_), intent(out) :: info + + + integer(psb_ipk_) :: iam, np, i,j,k, ip, ipx, idx, sz + + call psb_info(ictxt,iam,np) + if (min(size(bsdindx),size(brvindx),size(sdsz),size(rvsz)) 0) then + idx = bsdindx(ip+1) + call psb_snd(ictxt,valsnd(idx+1:idx+sz),ip) + end if + end do + + do ip = 0, np-1 + sz = rvsz(ip+1) + if (sz > 0) then + idx = brvindx(ip+1) + call psb_rcv(ictxt,valrcv(idx+1:idx+sz),ip) + end if + end do + + end subroutine i_my_a2av +#endif +#endif +End Subroutine psb_z_csr_halo