New VSZ option for matdist

pizdaint-runs
Salvatore Filippone 5 years ago
parent 7c491f06f9
commit 20a5aa9e9a

@ -30,7 +30,7 @@
! !
! !
subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -83,14 +83,15 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act & i, ll, nz, isize, iproc, nnr, err, err_act
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
complex(psb_spk_), allocatable :: val(:) complex(psb_spk_), allocatable :: val(:)
@ -110,6 +111,16 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
root = psb_root_ root = psb_root_
end if end if
call psb_info(ictxt, iam, np) call psb_info(ictxt, iam, np)
use_parts = present(parts)
use_vg = present(vg)
use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999
endif
if (iam == root) then if (iam == root) then
nrow = a_glob%get_nrows() nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols() ncol = a_glob%get_ncols()
@ -121,16 +132,13 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
endif endif
nnzero = a_glob%get_nzeros() nnzero = a_glob%get_nzeros()
nrhs = 1 nrhs = 1
endif if (use_vsz) then
if (sum(vsz(1:np)) /= nrow) then
write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np))
end if
end if
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif endif
! broadcast informations to other processors ! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root) call psb_bcast(ictxt,ncol, root)
@ -149,8 +157,10 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -178,7 +188,10 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -202,16 +215,26 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -288,6 +311,12 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)
@ -342,7 +371,7 @@ end subroutine psb_cmatdist
subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -395,15 +424,16 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err & i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
complex(psb_spk_), allocatable :: val(:) complex(psb_spk_), allocatable :: val(:)
@ -437,10 +467,11 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
endif endif
use_parts = present(parts) use_parts = present(parts)
use_v = present(v) use_vg = present(vg)
if (count((/ use_parts, use_v /)) /= 1) then use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_ info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts") call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999 goto 9999
endif endif
@ -462,8 +493,10 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -492,7 +525,10 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -516,16 +552,26 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -604,6 +650,12 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)

@ -36,7 +36,7 @@ module psb_c_mat_dist_mod
interface psb_matdist interface psb_matdist
subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -90,10 +90,11 @@ module psb_c_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_cmatdist end subroutine psb_cmatdist
subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -148,7 +149,8 @@ module psb_c_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_lcmatdist end subroutine psb_lcmatdist
end interface end interface

@ -30,7 +30,7 @@
! !
! !
subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -83,14 +83,15 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act & i, ll, nz, isize, iproc, nnr, err, err_act
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
real(psb_dpk_), allocatable :: val(:) real(psb_dpk_), allocatable :: val(:)
@ -110,6 +111,16 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
root = psb_root_ root = psb_root_
end if end if
call psb_info(ictxt, iam, np) call psb_info(ictxt, iam, np)
use_parts = present(parts)
use_vg = present(vg)
use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999
endif
if (iam == root) then if (iam == root) then
nrow = a_glob%get_nrows() nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols() ncol = a_glob%get_ncols()
@ -121,16 +132,13 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
endif endif
nnzero = a_glob%get_nzeros() nnzero = a_glob%get_nzeros()
nrhs = 1 nrhs = 1
endif if (use_vsz) then
if (sum(vsz(1:np)) /= nrow) then
write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np))
end if
end if
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif endif
! broadcast informations to other processors ! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root) call psb_bcast(ictxt,ncol, root)
@ -149,8 +157,10 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -178,7 +188,10 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -202,16 +215,26 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -288,6 +311,12 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)
@ -342,7 +371,7 @@ end subroutine psb_dmatdist
subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -395,15 +424,16 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err & i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
real(psb_dpk_), allocatable :: val(:) real(psb_dpk_), allocatable :: val(:)
@ -437,10 +467,11 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
endif endif
use_parts = present(parts) use_parts = present(parts)
use_v = present(v) use_vg = present(vg)
if (count((/ use_parts, use_v /)) /= 1) then use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_ info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts") call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999 goto 9999
endif endif
@ -462,8 +493,10 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -492,7 +525,10 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -516,16 +552,26 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -604,6 +650,12 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)

@ -36,7 +36,7 @@ module psb_d_mat_dist_mod
interface psb_matdist interface psb_matdist
subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -90,10 +90,11 @@ module psb_d_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_dmatdist end subroutine psb_dmatdist
subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -148,7 +149,8 @@ module psb_d_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_ldmatdist end subroutine psb_ldmatdist
end interface end interface

@ -30,7 +30,7 @@
! !
! !
subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -83,14 +83,15 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act & i, ll, nz, isize, iproc, nnr, err, err_act
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
real(psb_spk_), allocatable :: val(:) real(psb_spk_), allocatable :: val(:)
@ -110,6 +111,16 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
root = psb_root_ root = psb_root_
end if end if
call psb_info(ictxt, iam, np) call psb_info(ictxt, iam, np)
use_parts = present(parts)
use_vg = present(vg)
use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999
endif
if (iam == root) then if (iam == root) then
nrow = a_glob%get_nrows() nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols() ncol = a_glob%get_ncols()
@ -121,16 +132,13 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
endif endif
nnzero = a_glob%get_nzeros() nnzero = a_glob%get_nzeros()
nrhs = 1 nrhs = 1
endif if (use_vsz) then
if (sum(vsz(1:np)) /= nrow) then
write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np))
end if
end if
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif endif
! broadcast informations to other processors ! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root) call psb_bcast(ictxt,ncol, root)
@ -149,8 +157,10 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -178,7 +188,10 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -202,16 +215,26 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -288,6 +311,12 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)
@ -342,7 +371,7 @@ end subroutine psb_smatdist
subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -395,15 +424,16 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err & i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
real(psb_spk_), allocatable :: val(:) real(psb_spk_), allocatable :: val(:)
@ -437,10 +467,11 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
endif endif
use_parts = present(parts) use_parts = present(parts)
use_v = present(v) use_vg = present(vg)
if (count((/ use_parts, use_v /)) /= 1) then use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_ info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts") call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999 goto 9999
endif endif
@ -462,8 +493,10 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -492,7 +525,10 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -516,16 +552,26 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -604,6 +650,12 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)

@ -36,7 +36,7 @@ module psb_s_mat_dist_mod
interface psb_matdist interface psb_matdist
subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -90,10 +90,11 @@ module psb_s_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_smatdist end subroutine psb_smatdist
subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -148,7 +149,8 @@ module psb_s_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_lsmatdist end subroutine psb_lsmatdist
end interface end interface

@ -30,7 +30,7 @@
! !
! !
subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -83,14 +83,15 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act & i, ll, nz, isize, iproc, nnr, err, err_act
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
complex(psb_dpk_), allocatable :: val(:) complex(psb_dpk_), allocatable :: val(:)
@ -110,6 +111,16 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
root = psb_root_ root = psb_root_
end if end if
call psb_info(ictxt, iam, np) call psb_info(ictxt, iam, np)
use_parts = present(parts)
use_vg = present(vg)
use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999
endif
if (iam == root) then if (iam == root) then
nrow = a_glob%get_nrows() nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols() ncol = a_glob%get_ncols()
@ -121,16 +132,13 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
endif endif
nnzero = a_glob%get_nzeros() nnzero = a_glob%get_nzeros()
nrhs = 1 nrhs = 1
endif if (use_vsz) then
if (sum(vsz(1:np)) /= nrow) then
write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np))
end if
end if
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif endif
! broadcast informations to other processors ! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root) call psb_bcast(ictxt,ncol, root)
@ -149,8 +157,10 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -178,7 +188,10 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -202,16 +215,26 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -288,6 +311,12 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)
@ -342,7 +371,7 @@ end subroutine psb_zmatdist
subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -395,15 +424,16 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
! local variables ! local variables
logical :: use_parts, use_v logical :: use_parts, use_vg, use_vsz
integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err & i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:) integer(psb_lpk_), allocatable :: irow(:),icol(:)
complex(psb_dpk_), allocatable :: val(:) complex(psb_dpk_), allocatable :: val(:)
@ -437,10 +467,11 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
endif endif
use_parts = present(parts) use_parts = present(parts)
use_v = present(v) use_vg = present(vg)
if (count((/ use_parts, use_v /)) /= 1) then use_vsz = present(vsz)
if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then
info=psb_err_no_optional_arg_ info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts") call psb_errpush(info,name,a_err=" vg, vsz, parts")
goto 9999 goto 9999
endif endif
@ -462,8 +493,10 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
endif endif
if (use_parts) then if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then else if (use_vg) then
call psb_cdall(ictxt,desc_a,info,vg=v) call psb_cdall(ictxt,desc_a,info,vg=vg)
else if (use_vsz) then
call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1))
else else
info = -1 info = -1
end if end if
@ -492,7 +525,10 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
end if end if
i_count = 1 i_count = 1
if (use_vsz) then
iproc = 0
lastigp = vsz(iproc+1)
end if
do while (i_count <= nrow) do while (i_count <= nrow)
if (use_parts) then if (use_parts) then
@ -516,16 +552,26 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
if (iwrk2(1) /= iproc ) exit if (iwrk2(1) /= iproc ) exit
end do end do
end if end if
else else if (use_vg) then
np_sharing = 1 np_sharing = 1
j_count = i_count j_count = i_count
iproc = v(i_count) iproc = vg(i_count)
iwork(1:np_sharing) = iproc iwork(1:np_sharing) = iproc
do do
j_count = j_count + 1 j_count = j_count + 1
if (j_count-i_count >= nb) exit if (j_count-i_count >= nb) exit
if (j_count > nrow) exit if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit if (vg(j_count) /= iproc ) exit
end do
else if (use_vsz) then
np_sharing = 1
j_count = i_count
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (j_count > lastigp) exit
end do end do
end if end if
@ -604,6 +650,12 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
end do end do
endif endif
i_count = j_count i_count = j_count
if ((use_vsz).and.(j_count <= nrow)) then
if (j_count > lastigp) then
iproc = iproc + 1
lastigp = lastigp + vsz(iproc+1)
end if
end if
end do end do
call psb_barrier(ictxt) call psb_barrier(ictxt)

@ -36,7 +36,7 @@ module psb_z_mat_dist_mod
interface psb_matdist interface psb_matdist
subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -90,10 +90,11 @@ module psb_z_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_zmatdist end subroutine psb_zmatdist
subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold) & info, parts, vg, vsz, inroot,fmt,mold)
! !
! an utility subroutine to distribute a matrix among processors ! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using ! according to a user defined data distribution, using
@ -148,7 +149,8 @@ module psb_z_mat_dist_mod
character(len=*), optional :: fmt character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:) integer(psb_ipk_), optional :: vg(:)
integer(psb_ipk_), optional :: vsz(:)
end subroutine psb_lzmatdist end subroutine psb_lzmatdist
end interface end interface

Loading…
Cancel
Save