diff --git a/util/psb_c_mat_dist_impl.f90 b/util/psb_c_mat_dist_impl.f90 index dfaeac25..7767ce2e 100644 --- a/util/psb_c_mat_dist_impl.f90 +++ b/util/psb_c_mat_dist_impl.f90 @@ -30,7 +30,7 @@ ! ! subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -83,14 +83,15 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_c_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) complex(psb_spk_), allocatable :: val(:) @@ -110,6 +111,16 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& root = psb_root_ end if call psb_info(ictxt, iam, np) + + use_parts = present(parts) + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" vg, vsz, parts") + goto 9999 + endif + if (iam == root) then nrow = a_glob%get_nrows() ncol = a_glob%get_ncols() @@ -121,16 +132,13 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& endif nnzero = a_glob%get_nzeros() nrhs = 1 + if (use_vsz) then + if (sum(vsz(1:np)) /= nrow) then + write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np)) + end if + end if + endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -149,8 +157,10 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -178,7 +188,10 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -202,17 +215,27 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -288,6 +311,12 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) @@ -342,7 +371,7 @@ end subroutine psb_cmatdist subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -395,15 +424,16 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_c_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) + ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: err_act, il, inz integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& & i, ll, nz, isize, nnr, err - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) complex(psb_spk_), allocatable :: val(:) @@ -437,10 +467,11 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& endif use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") + call psb_errpush(info,name,a_err=" vg, vsz, parts") goto 9999 endif @@ -462,8 +493,10 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -492,7 +525,10 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -516,17 +552,27 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -604,6 +650,12 @@ subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) diff --git a/util/psb_c_mat_dist_mod.f90 b/util/psb_c_mat_dist_mod.f90 index 422f4e96..c46efd2f 100644 --- a/util/psb_c_mat_dist_mod.f90 +++ b/util/psb_c_mat_dist_mod.f90 @@ -36,7 +36,7 @@ module psb_c_mat_dist_mod interface psb_matdist subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -90,10 +90,11 @@ module psb_c_mat_dist_mod character(len=*), optional :: fmt class(psb_c_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_cmatdist subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -148,7 +149,8 @@ module psb_c_mat_dist_mod character(len=*), optional :: fmt class(psb_c_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_lcmatdist end interface diff --git a/util/psb_d_mat_dist_impl.f90 b/util/psb_d_mat_dist_impl.f90 index 90236db2..17df5f69 100644 --- a/util/psb_d_mat_dist_impl.f90 +++ b/util/psb_d_mat_dist_impl.f90 @@ -30,7 +30,7 @@ ! ! subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -83,14 +83,15 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_d_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) real(psb_dpk_), allocatable :: val(:) @@ -110,6 +111,16 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& root = psb_root_ end if call psb_info(ictxt, iam, np) + + use_parts = present(parts) + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" vg, vsz, parts") + goto 9999 + endif + if (iam == root) then nrow = a_glob%get_nrows() ncol = a_glob%get_ncols() @@ -121,16 +132,13 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& endif nnzero = a_glob%get_nzeros() nrhs = 1 + if (use_vsz) then + if (sum(vsz(1:np)) /= nrow) then + write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np)) + end if + end if + endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -149,8 +157,10 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -178,7 +188,10 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -202,17 +215,27 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -288,6 +311,12 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) @@ -342,7 +371,7 @@ end subroutine psb_dmatdist subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -395,15 +424,16 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_d_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) + ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: err_act, il, inz integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& & i, ll, nz, isize, nnr, err - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) real(psb_dpk_), allocatable :: val(:) @@ -437,10 +467,11 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& endif use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") + call psb_errpush(info,name,a_err=" vg, vsz, parts") goto 9999 endif @@ -462,8 +493,10 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -492,7 +525,10 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -516,17 +552,27 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -604,6 +650,12 @@ subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) diff --git a/util/psb_d_mat_dist_mod.f90 b/util/psb_d_mat_dist_mod.f90 index beb7f113..dc0e4958 100644 --- a/util/psb_d_mat_dist_mod.f90 +++ b/util/psb_d_mat_dist_mod.f90 @@ -36,7 +36,7 @@ module psb_d_mat_dist_mod interface psb_matdist subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -90,10 +90,11 @@ module psb_d_mat_dist_mod character(len=*), optional :: fmt class(psb_d_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_dmatdist subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -148,7 +149,8 @@ module psb_d_mat_dist_mod character(len=*), optional :: fmt class(psb_d_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_ldmatdist end interface diff --git a/util/psb_s_mat_dist_impl.f90 b/util/psb_s_mat_dist_impl.f90 index 104340ee..2ae59c3d 100644 --- a/util/psb_s_mat_dist_impl.f90 +++ b/util/psb_s_mat_dist_impl.f90 @@ -30,7 +30,7 @@ ! ! subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -83,14 +83,15 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_s_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) real(psb_spk_), allocatable :: val(:) @@ -110,6 +111,16 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& root = psb_root_ end if call psb_info(ictxt, iam, np) + + use_parts = present(parts) + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" vg, vsz, parts") + goto 9999 + endif + if (iam == root) then nrow = a_glob%get_nrows() ncol = a_glob%get_ncols() @@ -121,16 +132,13 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& endif nnzero = a_glob%get_nzeros() nrhs = 1 + if (use_vsz) then + if (sum(vsz(1:np)) /= nrow) then + write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np)) + end if + end if + endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -149,8 +157,10 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -178,7 +188,10 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -202,17 +215,27 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -288,6 +311,12 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) @@ -342,7 +371,7 @@ end subroutine psb_smatdist subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -395,15 +424,16 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_s_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) + ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: err_act, il, inz integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& & i, ll, nz, isize, nnr, err - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) real(psb_spk_), allocatable :: val(:) @@ -437,10 +467,11 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& endif use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") + call psb_errpush(info,name,a_err=" vg, vsz, parts") goto 9999 endif @@ -462,8 +493,10 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -492,7 +525,10 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -516,17 +552,27 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -604,6 +650,12 @@ subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) diff --git a/util/psb_s_mat_dist_mod.f90 b/util/psb_s_mat_dist_mod.f90 index 3b73e5f8..c4207da7 100644 --- a/util/psb_s_mat_dist_mod.f90 +++ b/util/psb_s_mat_dist_mod.f90 @@ -36,7 +36,7 @@ module psb_s_mat_dist_mod interface psb_matdist subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -90,10 +90,11 @@ module psb_s_mat_dist_mod character(len=*), optional :: fmt class(psb_s_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_smatdist subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -148,7 +149,8 @@ module psb_s_mat_dist_mod character(len=*), optional :: fmt class(psb_s_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_lsmatdist end interface diff --git a/util/psb_z_mat_dist_impl.f90 b/util/psb_z_mat_dist_impl.f90 index 818f28e4..d45382f3 100644 --- a/util/psb_z_mat_dist_impl.f90 +++ b/util/psb_z_mat_dist_impl.f90 @@ -30,7 +30,7 @@ ! ! subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -83,14 +83,15 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_z_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: k_count, root, liwork, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) complex(psb_dpk_), allocatable :: val(:) @@ -110,6 +111,16 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& root = psb_root_ end if call psb_info(ictxt, iam, np) + + use_parts = present(parts) + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" vg, vsz, parts") + goto 9999 + endif + if (iam == root) then nrow = a_glob%get_nrows() ncol = a_glob%get_ncols() @@ -121,16 +132,13 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& endif nnzero = a_glob%get_nzeros() nrhs = 1 + if (use_vsz) then + if (sum(vsz(1:np)) /= nrow) then + write(0,*) 'Input data mismatch :',nrow,sum(vsz(1:np)) + end if + end if + endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -149,8 +157,10 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -178,7 +188,10 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -202,17 +215,27 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -288,6 +311,12 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) @@ -342,7 +371,7 @@ end subroutine psb_zmatdist subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -395,15 +424,16 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& character(len=*), optional :: fmt class(psb_z_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) + ! local variables - logical :: use_parts, use_v + logical :: use_parts, use_vg, use_vsz integer(psb_ipk_) :: np, iam, np_sharing, root, iproc integer(psb_ipk_) :: err_act, il, inz integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& & i, ll, nz, isize, nnr, err - integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig, lastigp integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) integer(psb_lpk_), allocatable :: irow(:),icol(:) complex(psb_dpk_), allocatable :: val(:) @@ -437,10 +467,11 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& endif use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then + use_vg = present(vg) + use_vsz = present(vsz) + if (count((/ use_parts, use_vg, use_vsz /)) /= 1) then info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") + call psb_errpush(info,name,a_err=" vg, vsz, parts") goto 9999 endif @@ -462,8 +493,10 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) + else if (use_vg) then + call psb_cdall(ictxt,desc_a,info,vg=vg) + else if (use_vsz) then + call psb_cdall(ictxt,desc_a,info,nl=vsz(iam+1)) else info = -1 end if @@ -492,7 +525,10 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& end if i_count = 1 - + if (use_vsz) then + iproc = 0 + lastigp = vsz(iproc+1) + end if do while (i_count <= nrow) if (use_parts) then @@ -516,17 +552,27 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& if (iwrk2(1) /= iproc ) exit end do end if - else + else if (use_vg) then np_sharing = 1 j_count = i_count - iproc = v(i_count) + iproc = vg(i_count) iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit + if (vg(j_count) /= iproc ) exit end do + else if (use_vsz) then + np_sharing = 1 + j_count = i_count + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (j_count > lastigp) exit + end do end if ! now we should insert rows i_count..j_count-1 @@ -604,6 +650,12 @@ subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& end do endif i_count = j_count + if ((use_vsz).and.(j_count <= nrow)) then + if (j_count > lastigp) then + iproc = iproc + 1 + lastigp = lastigp + vsz(iproc+1) + end if + end if end do call psb_barrier(ictxt) diff --git a/util/psb_z_mat_dist_mod.f90 b/util/psb_z_mat_dist_mod.f90 index 7d769101..d10200dd 100644 --- a/util/psb_z_mat_dist_mod.f90 +++ b/util/psb_z_mat_dist_mod.f90 @@ -36,7 +36,7 @@ module psb_z_mat_dist_mod interface psb_matdist subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -90,10 +90,11 @@ module psb_z_mat_dist_mod character(len=*), optional :: fmt class(psb_z_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_zmatdist subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& - & info, parts, v, inroot,fmt,mold) + & info, parts, vg, vsz, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using @@ -148,7 +149,8 @@ module psb_z_mat_dist_mod character(len=*), optional :: fmt class(psb_z_base_sparse_mat), optional :: mold procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + integer(psb_ipk_), optional :: vg(:) + integer(psb_ipk_), optional :: vsz(:) end subroutine psb_lzmatdist end interface