base/tools/psb_csphalo.F90
 base/tools/psb_dsphalo.F90
 base/tools/psb_ssphalo.F90
 base/tools/psb_zsphalo.F90

Fix usage of move_alloc for polymorphic variables.
psblas3-type-indexed
Salvatore Filippone 13 years ago
parent b0e7e05899
commit 523056dbb6

@ -57,7 +57,6 @@
!
Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
& rowscale,colscale,outfmt,data)
use psb_base_mod, psb_protect_name => psb_csphalo
#ifdef MPI_MOD
@ -84,7 +83,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &
& rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:)
complex(psb_spk_), allocatable :: valsnd(:)
type(psb_c_coo_sparse_mat), allocatable :: acoo
class(psb_c_base_sparse_mat), allocatable :: acoo
integer, pointer :: idxv(:)
logical :: rowcnv_,colcnv_,rowscale_,colscale_
character(len=5) :: outfmt_
@ -156,190 +155,209 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
case(psb_comm_ext_)
idxv => desc_a%ext_index
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
case default
call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector')
goto 9999
end select
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
allocate(psb_c_coo_sparse_mat :: acoo,stat=info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
info = psb_err_alloc_dealloc_
call psb_errpush(info,name)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
select type (acoo)
type is (psb_c_coo_sparse_mat)
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_complex,&
& acoo%val,rvsz,brvindx,mpi_complex,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_complex,&
& acoo%val,rvsz,brvindx,mpi_complex,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
class default
! This is impossible
info = psb_err_internal_error_
call psb_Errpush(info,name)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
end select
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),&

@ -55,7 +55,6 @@
! psb_comm_ext_ use ext_index
! psb_comm_ovrl_ DISABLED for this routine.
!
!
Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
& rowscale,colscale,outfmt,data)
use psb_base_mod, psb_protect_name => psb_dsphalo
@ -68,9 +67,9 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
include 'mpif.h'
#endif
type(psb_dspmat_type),intent(in) :: a
type(psb_dspmat_type),intent(inout) :: blk
type(psb_desc_type),intent(in), target :: desc_a
Type(psb_dspmat_type),Intent(in) :: a
Type(psb_dspmat_type),Intent(inout) :: blk
Type(psb_desc_type),Intent(in), target :: desc_a
integer, intent(out) :: info
logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale
character(len=5), optional :: outfmt
@ -84,15 +83,15 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &
& rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:)
real(psb_dpk_), allocatable :: valsnd(:)
type(psb_d_coo_sparse_mat), allocatable :: acoo
class(psb_d_base_sparse_mat), allocatable :: acoo
integer, pointer :: idxv(:)
logical :: rowcnv_,colcnv_,rowscale_,colscale_
character(len=5) :: outfmt_
integer :: debug_level, debug_unit
character(len=20) :: name, ch_err
if(psb_get_errstatus() /= 0) return
info=psb_success_
if (psb_errstatus_fatal()) return
name='psb_dsphalo'
call psb_erractionsave(err_act)
debug_unit = psb_get_debug_unit()
@ -133,12 +132,6 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
outfmt_ = 'CSR'
endif
if (.not.desc_a%is_asb()) then
info = psb_err_invalid_cd_state_
call psb_errpush(info,name)
goto 9999
end if
ictxt = desc_a%get_context()
icomm = desc_a%get_mpic()
@ -162,190 +155,209 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
case(psb_comm_ext_)
idxv => desc_a%ext_index
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
case default
call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector')
goto 9999
end select
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
allocate(psb_d_coo_sparse_mat :: acoo,stat=info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
info = psb_err_alloc_dealloc_
call psb_errpush(info,name)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
select type (acoo)
type is (psb_d_coo_sparse_mat)
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_precision,&
& acoo%val,rvsz,brvindx,mpi_double_precision,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_precision,&
& acoo%val,rvsz,brvindx,mpi_double_precision,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
class default
! This is impossible
info = psb_err_internal_error_
call psb_Errpush(info,name)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
end select
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),&

@ -55,7 +55,6 @@
! psb_comm_ext_ use ext_index
! psb_comm_ovrl_ DISABLED for this routine.
!
!
Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
& rowscale,colscale,outfmt,data)
use psb_base_mod, psb_protect_name => psb_ssphalo
@ -70,7 +69,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
Type(psb_sspmat_type),Intent(in) :: a
Type(psb_sspmat_type),Intent(inout) :: blk
Type(psb_desc_type),Intent(in), target :: desc_a
Type(psb_desc_type),Intent(in), target :: desc_a
integer, intent(out) :: info
logical, optional, intent(in) :: rowcnv,colcnv,rowscale,colscale
character(len=5), optional :: outfmt
@ -84,7 +83,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &
& rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:)
real(psb_spk_), allocatable :: valsnd(:)
type(psb_s_coo_sparse_mat), allocatable :: acoo
class(psb_s_base_sparse_mat), allocatable :: acoo
integer, pointer :: idxv(:)
logical :: rowcnv_,colcnv_,rowscale_,colscale_
character(len=5) :: outfmt_
@ -156,190 +155,209 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
case(psb_comm_ext_)
idxv => desc_a%ext_index
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
case default
call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector')
goto 9999
end select
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
allocate(psb_s_coo_sparse_mat :: acoo,stat=info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
info = psb_err_alloc_dealloc_
call psb_errpush(info,name)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
select type (acoo)
type is (psb_s_coo_sparse_mat)
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_real,&
& acoo%val,rvsz,brvindx,mpi_real,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_real,&
& acoo%val,rvsz,brvindx,mpi_real,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
class default
! This is impossible
info = psb_err_internal_error_
call psb_Errpush(info,name)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
end select
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),&

@ -83,7 +83,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &
& rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:)
complex(psb_dpk_), allocatable :: valsnd(:)
type(psb_z_coo_sparse_mat), allocatable :: acoo
class(psb_z_base_sparse_mat), allocatable :: acoo
integer, pointer :: idxv(:)
logical :: rowcnv_,colcnv_,rowscale_,colscale_
character(len=5) :: outfmt_
@ -155,190 +155,209 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
case(psb_comm_ext_)
idxv => desc_a%ext_index
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
! !$ case(psb_comm_ovr_)
! !$ idxv => desc_a%ovrlap_index
! Do not accept OVRLAP_INDEX any longer.
case default
call psb_errpush(psb_err_from_subroutine_,name,a_err='wrong Data selector')
goto 9999
end select
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
allocate(psb_z_coo_sparse_mat :: acoo,stat=info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
info = psb_err_alloc_dealloc_
call psb_errpush(info,name)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
select type (acoo)
type is (psb_z_coo_sparse_mat)
l1 = 0
sdsz(:)=0
rvsz(:)=0
ipx = 1
brvindx(ipx) = 0
bsdindx(ipx) = 0
counter=1
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(0,a%get_ncols(),info)
! For all rows in the halo descriptor, extract and send/receive.
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
counter = counter+n_el_send+3
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_complex,&
& acoo%val,rvsz,brvindx,mpi_double_complex,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv
n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_el_send+3
Enddo
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
l1 = 0
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
tot_elem=tot_elem+n_elem
Enddo
ipx = ipx + 1
counter = counter+n_el_send+3
Enddo
nz = tot_elem
if (rowcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (colcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_complex,&
& acoo%val,rvsz,brvindx,mpi_double_complex,icomm,info)
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ia,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& acoo%ja,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
!
! Convert into local numbering
!
if (rowcnv_) call psb_glob_to_loc(acoo%ia(1:iszr),desc_a,info,iact='I')
if (colcnv_) call psb_glob_to_loc(acoo%ja(1:iszr),desc_a,info,iact='I')
if (info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psbglob_to_loc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
class default
! This is impossible
info = psb_err_internal_error_
call psb_Errpush(info,name)
goto 9999
end if
l1 = 0
call acoo%set_nrows(0)
!
irmin = huge(irmin)
icmin = huge(icmin)
irmax = 0
icmax = 0
Do i=1,iszr
r=(acoo%ia(i))
k=(acoo%ja(i))
! Just in case some of the conversions were out-of-range
If ((r>0).and.(k>0)) Then
l1=l1+1
acoo%val(l1) = acoo%val(i)
acoo%ia(l1) = r
acoo%ja(l1) = k
irmin = min(irmin,r)
irmax = max(irmax,r)
icmin = min(icmin,k)
icmax = max(icmax,k)
End If
Enddo
if (rowscale_) then
call acoo%set_nrows(max(irmax-irmin+1,0))
acoo%ia(1:l1) = acoo%ia(1:l1) - irmin + 1
else
call acoo%set_nrows(irmax)
end if
if (colscale_) then
call acoo%set_ncols(max(icmax-icmin+1,0))
acoo%ja(1:l1) = acoo%ja(1:l1) - icmin + 1
else
call acoo%set_ncols(icmax)
end if
call acoo%set_nzeros(l1)
end select
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),&

Loading…
Cancel
Save