diff --git a/base/modules/serial/psb_c_mat_mod.F90 b/base/modules/serial/psb_c_mat_mod.F90 index e3e8ab69..5375ab18 100644 --- a/base/modules/serial/psb_c_mat_mod.F90 +++ b/base/modules/serial/psb_c_mat_mod.F90 @@ -111,6 +111,8 @@ module psb_c_mat_mod procedure, pass(a) :: is_repeatable_updates => psb_c_is_repeatable_updates procedure, pass(a) :: get_fmt => psb_c_get_fmt procedure, pass(a) :: sizeof => psb_c_sizeof + procedure, pass(a) :: is_remote_build => psb_c_is_remote_build + ! Setters procedure, pass(a) :: set_nrows => psb_c_set_nrows @@ -127,6 +129,7 @@ module psb_c_mat_mod procedure, pass(a) :: set_symmetric => psb_c_set_symmetric procedure, pass(a) :: set_unit => psb_c_set_unit procedure, pass(a) :: set_repeatable_updates => psb_c_set_repeatable_updates + procedure, pass(a) :: set_remote_build => psb_c_set_remote_build ! Memory/data management procedure, pass(a) :: csall => psb_c_csall @@ -2294,7 +2297,25 @@ contains end function c_mat_is_sync + function psb_c_is_remote_build(a) result(res) + implicit none + class(psb_cspmat_type), intent(in) :: a + logical :: res + res = (a%remote_build == psb_matbld_remote_) + end function psb_c_is_remote_build + subroutine psb_c_set_remote_build(a,val) + implicit none + class(psb_cspmat_type), intent(inout) :: a + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + a%remote_build = val + else + a%remote_build = psb_matbld_remote_ + end if + end subroutine psb_c_set_remote_build + function psb_c_is_repeatable_updates(a) result(res) implicit none class(psb_cspmat_type), intent(in) :: a diff --git a/base/modules/serial/psb_c_vect_mod.F90 b/base/modules/serial/psb_c_vect_mod.F90 index 0edb7245..2166cf13 100644 --- a/base/modules/serial/psb_c_vect_mod.F90 +++ b/base/modules/serial/psb_c_vect_mod.F90 @@ -44,10 +44,16 @@ module psb_c_vect_mod type psb_c_vect_type class(psb_c_base_vect_type), allocatable :: v + integer(psb_ipk_) :: nrmv = 0 + integer(psb_ipk_) :: remote_build=psb_matbld_noremote_ + complex(psb_spk_), allocatable :: rmtv(:) + integer(psb_lpk_), allocatable :: rmidx(:) contains procedure, pass(x) :: get_nrows => c_vect_get_nrows procedure, pass(x) :: sizeof => c_vect_sizeof procedure, pass(x) :: get_fmt => c_vect_get_fmt + procedure, pass(x) :: is_remote_build => psb_c_is_remote_build + procedure, pass(x) :: set_remote_build => psb_c_set_remote_build procedure, pass(x) :: all => c_vect_all procedure, pass(x) :: reall => c_vect_reall procedure, pass(x) :: zero => c_vect_zero @@ -168,6 +174,25 @@ module psb_c_vect_mod contains + function psb_c_is_remote_build(x) result(res) + implicit none + class(psb_c_vect_type), intent(in) :: x + logical :: res + res = (x%remote_build == psb_matbld_remote_) + end function psb_c_is_remote_build + + subroutine psb_c_set_remote_build(x,val) + implicit none + class(psb_c_vect_type), intent(inout) :: x + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + x%remote_build = val + else + x%remote_build = psb_matbld_remote_ + end if + end subroutine psb_c_set_remote_build + subroutine psb_c_set_vect_default(v) implicit none class(psb_c_base_vect_type), intent(in) :: v @@ -360,13 +385,14 @@ contains if (allocated(x%v)) res = x%v%get_fmt() end function c_vect_get_fmt - subroutine c_vect_all(n, x, info, mold) + subroutine c_vect_all(n, x, info, mold,mode) implicit none integer(psb_ipk_), intent(in) :: n class(psb_c_vect_type), intent(inout) :: x - class(psb_c_base_vect_type), intent(in), optional :: mold integer(psb_ipk_), intent(out) :: info + class(psb_c_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), intent(in), optional :: mode if (allocated(x%v)) & & call x%free(info) @@ -381,7 +407,9 @@ contains else info = psb_err_alloc_dealloc_ end if - + x%nrmv = 0 + x%remote_build = psb_matbld_noremote_ + if (present(mode)) call x%set_remote_build(mode) end subroutine c_vect_all subroutine c_vect_reall(n, x, info) @@ -412,13 +440,13 @@ contains use psi_serial_mod use psb_realloc_mod implicit none - integer(psb_ipk_), intent(in) :: n + integer(psb_ipk_), intent(in) :: n class(psb_c_vect_type), intent(inout) :: x - integer(psb_ipk_), intent(out) :: info - - if (allocated(x%v)) & - & call x%v%asb(n,info) + integer(psb_ipk_), intent(out) :: info + if (allocated(x%v)) then + call x%v%asb(n,info) + end if end subroutine c_vect_asb subroutine c_vect_gthab(n,idx,alpha,x,beta,y) diff --git a/base/modules/serial/psb_d_mat_mod.F90 b/base/modules/serial/psb_d_mat_mod.F90 index a072197e..f9a70a66 100644 --- a/base/modules/serial/psb_d_mat_mod.F90 +++ b/base/modules/serial/psb_d_mat_mod.F90 @@ -111,6 +111,8 @@ module psb_d_mat_mod procedure, pass(a) :: is_repeatable_updates => psb_d_is_repeatable_updates procedure, pass(a) :: get_fmt => psb_d_get_fmt procedure, pass(a) :: sizeof => psb_d_sizeof + procedure, pass(a) :: is_remote_build => psb_d_is_remote_build + ! Setters procedure, pass(a) :: set_nrows => psb_d_set_nrows @@ -127,6 +129,7 @@ module psb_d_mat_mod procedure, pass(a) :: set_symmetric => psb_d_set_symmetric procedure, pass(a) :: set_unit => psb_d_set_unit procedure, pass(a) :: set_repeatable_updates => psb_d_set_repeatable_updates + procedure, pass(a) :: set_remote_build => psb_d_set_remote_build ! Memory/data management procedure, pass(a) :: csall => psb_d_csall @@ -2294,7 +2297,25 @@ contains end function d_mat_is_sync + function psb_d_is_remote_build(a) result(res) + implicit none + class(psb_dspmat_type), intent(in) :: a + logical :: res + res = (a%remote_build == psb_matbld_remote_) + end function psb_d_is_remote_build + subroutine psb_d_set_remote_build(a,val) + implicit none + class(psb_dspmat_type), intent(inout) :: a + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + a%remote_build = val + else + a%remote_build = psb_matbld_remote_ + end if + end subroutine psb_d_set_remote_build + function psb_d_is_repeatable_updates(a) result(res) implicit none class(psb_dspmat_type), intent(in) :: a diff --git a/base/modules/serial/psb_d_vect_mod.F90 b/base/modules/serial/psb_d_vect_mod.F90 index ec928584..b419ae6a 100644 --- a/base/modules/serial/psb_d_vect_mod.F90 +++ b/base/modules/serial/psb_d_vect_mod.F90 @@ -44,10 +44,16 @@ module psb_d_vect_mod type psb_d_vect_type class(psb_d_base_vect_type), allocatable :: v + integer(psb_ipk_) :: nrmv = 0 + integer(psb_ipk_) :: remote_build=psb_matbld_noremote_ + real(psb_dpk_), allocatable :: rmtv(:) + integer(psb_lpk_), allocatable :: rmidx(:) contains procedure, pass(x) :: get_nrows => d_vect_get_nrows procedure, pass(x) :: sizeof => d_vect_sizeof procedure, pass(x) :: get_fmt => d_vect_get_fmt + procedure, pass(x) :: is_remote_build => psb_d_is_remote_build + procedure, pass(x) :: set_remote_build => psb_d_set_remote_build procedure, pass(x) :: all => d_vect_all procedure, pass(x) :: reall => d_vect_reall procedure, pass(x) :: zero => d_vect_zero @@ -175,6 +181,25 @@ module psb_d_vect_mod contains + function psb_d_is_remote_build(x) result(res) + implicit none + class(psb_d_vect_type), intent(in) :: x + logical :: res + res = (x%remote_build == psb_matbld_remote_) + end function psb_d_is_remote_build + + subroutine psb_d_set_remote_build(x,val) + implicit none + class(psb_d_vect_type), intent(inout) :: x + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + x%remote_build = val + else + x%remote_build = psb_matbld_remote_ + end if + end subroutine psb_d_set_remote_build + subroutine psb_d_set_vect_default(v) implicit none class(psb_d_base_vect_type), intent(in) :: v @@ -367,13 +392,14 @@ contains if (allocated(x%v)) res = x%v%get_fmt() end function d_vect_get_fmt - subroutine d_vect_all(n, x, info, mold) + subroutine d_vect_all(n, x, info, mold,mode) implicit none integer(psb_ipk_), intent(in) :: n class(psb_d_vect_type), intent(inout) :: x - class(psb_d_base_vect_type), intent(in), optional :: mold integer(psb_ipk_), intent(out) :: info + class(psb_d_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), intent(in), optional :: mode if (allocated(x%v)) & & call x%free(info) @@ -388,7 +414,9 @@ contains else info = psb_err_alloc_dealloc_ end if - + x%nrmv = 0 + x%remote_build = psb_matbld_noremote_ + if (present(mode)) call x%set_remote_build(mode) end subroutine d_vect_all subroutine d_vect_reall(n, x, info) @@ -419,13 +447,13 @@ contains use psi_serial_mod use psb_realloc_mod implicit none - integer(psb_ipk_), intent(in) :: n + integer(psb_ipk_), intent(in) :: n class(psb_d_vect_type), intent(inout) :: x - integer(psb_ipk_), intent(out) :: info - - if (allocated(x%v)) & - & call x%v%asb(n,info) + integer(psb_ipk_), intent(out) :: info + if (allocated(x%v)) then + call x%v%asb(n,info) + end if end subroutine d_vect_asb subroutine d_vect_gthab(n,idx,alpha,x,beta,y) diff --git a/base/modules/serial/psb_i_vect_mod.F90 b/base/modules/serial/psb_i_vect_mod.F90 index 75064b81..6cebf62a 100644 --- a/base/modules/serial/psb_i_vect_mod.F90 +++ b/base/modules/serial/psb_i_vect_mod.F90 @@ -43,10 +43,16 @@ module psb_i_vect_mod type psb_i_vect_type class(psb_i_base_vect_type), allocatable :: v + integer(psb_ipk_) :: nrmv = 0 + integer(psb_ipk_) :: remote_build=psb_matbld_noremote_ + integer(psb_ipk_), allocatable :: rmtv(:) + integer(psb_lpk_), allocatable :: rmidx(:) contains procedure, pass(x) :: get_nrows => i_vect_get_nrows procedure, pass(x) :: sizeof => i_vect_sizeof procedure, pass(x) :: get_fmt => i_vect_get_fmt + procedure, pass(x) :: is_remote_build => psb_i_is_remote_build + procedure, pass(x) :: set_remote_build => psb_i_set_remote_build procedure, pass(x) :: all => i_vect_all procedure, pass(x) :: reall => i_vect_reall procedure, pass(x) :: zero => i_vect_zero @@ -115,6 +121,25 @@ module psb_i_vect_mod contains + function psb_i_is_remote_build(x) result(res) + implicit none + class(psb_i_vect_type), intent(in) :: x + logical :: res + res = (x%remote_build == psb_matbld_remote_) + end function psb_i_is_remote_build + + subroutine psb_i_set_remote_build(x,val) + implicit none + class(psb_i_vect_type), intent(inout) :: x + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + x%remote_build = val + else + x%remote_build = psb_matbld_remote_ + end if + end subroutine psb_i_set_remote_build + subroutine psb_i_set_vect_default(v) implicit none class(psb_i_base_vect_type), intent(in) :: v @@ -307,13 +332,14 @@ contains if (allocated(x%v)) res = x%v%get_fmt() end function i_vect_get_fmt - subroutine i_vect_all(n, x, info, mold) + subroutine i_vect_all(n, x, info, mold,mode) implicit none integer(psb_ipk_), intent(in) :: n class(psb_i_vect_type), intent(inout) :: x - class(psb_i_base_vect_type), intent(in), optional :: mold integer(psb_ipk_), intent(out) :: info + class(psb_i_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), intent(in), optional :: mode if (allocated(x%v)) & & call x%free(info) @@ -328,7 +354,9 @@ contains else info = psb_err_alloc_dealloc_ end if - + x%nrmv = 0 + x%remote_build = psb_matbld_noremote_ + if (present(mode)) call x%set_remote_build(mode) end subroutine i_vect_all subroutine i_vect_reall(n, x, info) @@ -359,13 +387,13 @@ contains use psi_serial_mod use psb_realloc_mod implicit none - integer(psb_ipk_), intent(in) :: n + integer(psb_ipk_), intent(in) :: n class(psb_i_vect_type), intent(inout) :: x - integer(psb_ipk_), intent(out) :: info - - if (allocated(x%v)) & - & call x%v%asb(n,info) + integer(psb_ipk_), intent(out) :: info + if (allocated(x%v)) then + call x%v%asb(n,info) + end if end subroutine i_vect_asb subroutine i_vect_gthab(n,idx,alpha,x,beta,y) diff --git a/base/modules/serial/psb_l_vect_mod.F90 b/base/modules/serial/psb_l_vect_mod.F90 index 3c86f8a2..df5691f4 100644 --- a/base/modules/serial/psb_l_vect_mod.F90 +++ b/base/modules/serial/psb_l_vect_mod.F90 @@ -44,10 +44,16 @@ module psb_l_vect_mod type psb_l_vect_type class(psb_l_base_vect_type), allocatable :: v + integer(psb_ipk_) :: nrmv = 0 + integer(psb_ipk_) :: remote_build=psb_matbld_noremote_ + integer(psb_lpk_), allocatable :: rmtv(:) + integer(psb_lpk_), allocatable :: rmidx(:) contains procedure, pass(x) :: get_nrows => l_vect_get_nrows procedure, pass(x) :: sizeof => l_vect_sizeof procedure, pass(x) :: get_fmt => l_vect_get_fmt + procedure, pass(x) :: is_remote_build => psb_l_is_remote_build + procedure, pass(x) :: set_remote_build => psb_l_set_remote_build procedure, pass(x) :: all => l_vect_all procedure, pass(x) :: reall => l_vect_reall procedure, pass(x) :: zero => l_vect_zero @@ -116,6 +122,25 @@ module psb_l_vect_mod contains + function psb_l_is_remote_build(x) result(res) + implicit none + class(psb_l_vect_type), intent(in) :: x + logical :: res + res = (x%remote_build == psb_matbld_remote_) + end function psb_l_is_remote_build + + subroutine psb_l_set_remote_build(x,val) + implicit none + class(psb_l_vect_type), intent(inout) :: x + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + x%remote_build = val + else + x%remote_build = psb_matbld_remote_ + end if + end subroutine psb_l_set_remote_build + subroutine psb_l_set_vect_default(v) implicit none class(psb_l_base_vect_type), intent(in) :: v @@ -308,13 +333,14 @@ contains if (allocated(x%v)) res = x%v%get_fmt() end function l_vect_get_fmt - subroutine l_vect_all(n, x, info, mold) + subroutine l_vect_all(n, x, info, mold,mode) implicit none integer(psb_ipk_), intent(in) :: n class(psb_l_vect_type), intent(inout) :: x - class(psb_l_base_vect_type), intent(in), optional :: mold integer(psb_ipk_), intent(out) :: info + class(psb_l_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), intent(in), optional :: mode if (allocated(x%v)) & & call x%free(info) @@ -329,7 +355,9 @@ contains else info = psb_err_alloc_dealloc_ end if - + x%nrmv = 0 + x%remote_build = psb_matbld_noremote_ + if (present(mode)) call x%set_remote_build(mode) end subroutine l_vect_all subroutine l_vect_reall(n, x, info) @@ -360,13 +388,13 @@ contains use psi_serial_mod use psb_realloc_mod implicit none - integer(psb_ipk_), intent(in) :: n + integer(psb_ipk_), intent(in) :: n class(psb_l_vect_type), intent(inout) :: x - integer(psb_ipk_), intent(out) :: info - - if (allocated(x%v)) & - & call x%v%asb(n,info) + integer(psb_ipk_), intent(out) :: info + if (allocated(x%v)) then + call x%v%asb(n,info) + end if end subroutine l_vect_asb subroutine l_vect_gthab(n,idx,alpha,x,beta,y) diff --git a/base/modules/serial/psb_s_mat_mod.F90 b/base/modules/serial/psb_s_mat_mod.F90 index 20bf3249..c1639ad5 100644 --- a/base/modules/serial/psb_s_mat_mod.F90 +++ b/base/modules/serial/psb_s_mat_mod.F90 @@ -111,6 +111,8 @@ module psb_s_mat_mod procedure, pass(a) :: is_repeatable_updates => psb_s_is_repeatable_updates procedure, pass(a) :: get_fmt => psb_s_get_fmt procedure, pass(a) :: sizeof => psb_s_sizeof + procedure, pass(a) :: is_remote_build => psb_s_is_remote_build + ! Setters procedure, pass(a) :: set_nrows => psb_s_set_nrows @@ -127,6 +129,7 @@ module psb_s_mat_mod procedure, pass(a) :: set_symmetric => psb_s_set_symmetric procedure, pass(a) :: set_unit => psb_s_set_unit procedure, pass(a) :: set_repeatable_updates => psb_s_set_repeatable_updates + procedure, pass(a) :: set_remote_build => psb_s_set_remote_build ! Memory/data management procedure, pass(a) :: csall => psb_s_csall @@ -2294,7 +2297,25 @@ contains end function s_mat_is_sync + function psb_s_is_remote_build(a) result(res) + implicit none + class(psb_sspmat_type), intent(in) :: a + logical :: res + res = (a%remote_build == psb_matbld_remote_) + end function psb_s_is_remote_build + subroutine psb_s_set_remote_build(a,val) + implicit none + class(psb_sspmat_type), intent(inout) :: a + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + a%remote_build = val + else + a%remote_build = psb_matbld_remote_ + end if + end subroutine psb_s_set_remote_build + function psb_s_is_repeatable_updates(a) result(res) implicit none class(psb_sspmat_type), intent(in) :: a diff --git a/base/modules/serial/psb_s_vect_mod.F90 b/base/modules/serial/psb_s_vect_mod.F90 index 8f378c6d..52380007 100644 --- a/base/modules/serial/psb_s_vect_mod.F90 +++ b/base/modules/serial/psb_s_vect_mod.F90 @@ -44,10 +44,16 @@ module psb_s_vect_mod type psb_s_vect_type class(psb_s_base_vect_type), allocatable :: v + integer(psb_ipk_) :: nrmv = 0 + integer(psb_ipk_) :: remote_build=psb_matbld_noremote_ + real(psb_spk_), allocatable :: rmtv(:) + integer(psb_lpk_), allocatable :: rmidx(:) contains procedure, pass(x) :: get_nrows => s_vect_get_nrows procedure, pass(x) :: sizeof => s_vect_sizeof procedure, pass(x) :: get_fmt => s_vect_get_fmt + procedure, pass(x) :: is_remote_build => psb_s_is_remote_build + procedure, pass(x) :: set_remote_build => psb_s_set_remote_build procedure, pass(x) :: all => s_vect_all procedure, pass(x) :: reall => s_vect_reall procedure, pass(x) :: zero => s_vect_zero @@ -175,6 +181,25 @@ module psb_s_vect_mod contains + function psb_s_is_remote_build(x) result(res) + implicit none + class(psb_s_vect_type), intent(in) :: x + logical :: res + res = (x%remote_build == psb_matbld_remote_) + end function psb_s_is_remote_build + + subroutine psb_s_set_remote_build(x,val) + implicit none + class(psb_s_vect_type), intent(inout) :: x + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + x%remote_build = val + else + x%remote_build = psb_matbld_remote_ + end if + end subroutine psb_s_set_remote_build + subroutine psb_s_set_vect_default(v) implicit none class(psb_s_base_vect_type), intent(in) :: v @@ -367,13 +392,14 @@ contains if (allocated(x%v)) res = x%v%get_fmt() end function s_vect_get_fmt - subroutine s_vect_all(n, x, info, mold) + subroutine s_vect_all(n, x, info, mold,mode) implicit none integer(psb_ipk_), intent(in) :: n class(psb_s_vect_type), intent(inout) :: x - class(psb_s_base_vect_type), intent(in), optional :: mold integer(psb_ipk_), intent(out) :: info + class(psb_s_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), intent(in), optional :: mode if (allocated(x%v)) & & call x%free(info) @@ -388,7 +414,9 @@ contains else info = psb_err_alloc_dealloc_ end if - + x%nrmv = 0 + x%remote_build = psb_matbld_noremote_ + if (present(mode)) call x%set_remote_build(mode) end subroutine s_vect_all subroutine s_vect_reall(n, x, info) @@ -419,13 +447,13 @@ contains use psi_serial_mod use psb_realloc_mod implicit none - integer(psb_ipk_), intent(in) :: n + integer(psb_ipk_), intent(in) :: n class(psb_s_vect_type), intent(inout) :: x - integer(psb_ipk_), intent(out) :: info - - if (allocated(x%v)) & - & call x%v%asb(n,info) + integer(psb_ipk_), intent(out) :: info + if (allocated(x%v)) then + call x%v%asb(n,info) + end if end subroutine s_vect_asb subroutine s_vect_gthab(n,idx,alpha,x,beta,y) diff --git a/base/modules/serial/psb_z_mat_mod.F90 b/base/modules/serial/psb_z_mat_mod.F90 index 1ca6941b..b84cd610 100644 --- a/base/modules/serial/psb_z_mat_mod.F90 +++ b/base/modules/serial/psb_z_mat_mod.F90 @@ -111,6 +111,8 @@ module psb_z_mat_mod procedure, pass(a) :: is_repeatable_updates => psb_z_is_repeatable_updates procedure, pass(a) :: get_fmt => psb_z_get_fmt procedure, pass(a) :: sizeof => psb_z_sizeof + procedure, pass(a) :: is_remote_build => psb_z_is_remote_build + ! Setters procedure, pass(a) :: set_nrows => psb_z_set_nrows @@ -127,6 +129,7 @@ module psb_z_mat_mod procedure, pass(a) :: set_symmetric => psb_z_set_symmetric procedure, pass(a) :: set_unit => psb_z_set_unit procedure, pass(a) :: set_repeatable_updates => psb_z_set_repeatable_updates + procedure, pass(a) :: set_remote_build => psb_z_set_remote_build ! Memory/data management procedure, pass(a) :: csall => psb_z_csall @@ -2294,7 +2297,25 @@ contains end function z_mat_is_sync + function psb_z_is_remote_build(a) result(res) + implicit none + class(psb_zspmat_type), intent(in) :: a + logical :: res + res = (a%remote_build == psb_matbld_remote_) + end function psb_z_is_remote_build + subroutine psb_z_set_remote_build(a,val) + implicit none + class(psb_zspmat_type), intent(inout) :: a + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + a%remote_build = val + else + a%remote_build = psb_matbld_remote_ + end if + end subroutine psb_z_set_remote_build + function psb_z_is_repeatable_updates(a) result(res) implicit none class(psb_zspmat_type), intent(in) :: a diff --git a/base/modules/serial/psb_z_vect_mod.F90 b/base/modules/serial/psb_z_vect_mod.F90 index 60c5b6f0..bd02ed62 100644 --- a/base/modules/serial/psb_z_vect_mod.F90 +++ b/base/modules/serial/psb_z_vect_mod.F90 @@ -44,10 +44,16 @@ module psb_z_vect_mod type psb_z_vect_type class(psb_z_base_vect_type), allocatable :: v + integer(psb_ipk_) :: nrmv = 0 + integer(psb_ipk_) :: remote_build=psb_matbld_noremote_ + complex(psb_dpk_), allocatable :: rmtv(:) + integer(psb_lpk_), allocatable :: rmidx(:) contains procedure, pass(x) :: get_nrows => z_vect_get_nrows procedure, pass(x) :: sizeof => z_vect_sizeof procedure, pass(x) :: get_fmt => z_vect_get_fmt + procedure, pass(x) :: is_remote_build => psb_z_is_remote_build + procedure, pass(x) :: set_remote_build => psb_z_set_remote_build procedure, pass(x) :: all => z_vect_all procedure, pass(x) :: reall => z_vect_reall procedure, pass(x) :: zero => z_vect_zero @@ -168,6 +174,25 @@ module psb_z_vect_mod contains + function psb_z_is_remote_build(x) result(res) + implicit none + class(psb_z_vect_type), intent(in) :: x + logical :: res + res = (x%remote_build == psb_matbld_remote_) + end function psb_z_is_remote_build + + subroutine psb_z_set_remote_build(x,val) + implicit none + class(psb_z_vect_type), intent(inout) :: x + integer(psb_ipk_), intent(in), optional :: val + + if (present(val)) then + x%remote_build = val + else + x%remote_build = psb_matbld_remote_ + end if + end subroutine psb_z_set_remote_build + subroutine psb_z_set_vect_default(v) implicit none class(psb_z_base_vect_type), intent(in) :: v @@ -360,13 +385,14 @@ contains if (allocated(x%v)) res = x%v%get_fmt() end function z_vect_get_fmt - subroutine z_vect_all(n, x, info, mold) + subroutine z_vect_all(n, x, info, mold,mode) implicit none integer(psb_ipk_), intent(in) :: n class(psb_z_vect_type), intent(inout) :: x - class(psb_z_base_vect_type), intent(in), optional :: mold integer(psb_ipk_), intent(out) :: info + class(psb_z_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), intent(in), optional :: mode if (allocated(x%v)) & & call x%free(info) @@ -381,7 +407,9 @@ contains else info = psb_err_alloc_dealloc_ end if - + x%nrmv = 0 + x%remote_build = psb_matbld_noremote_ + if (present(mode)) call x%set_remote_build(mode) end subroutine z_vect_all subroutine z_vect_reall(n, x, info) @@ -412,13 +440,13 @@ contains use psi_serial_mod use psb_realloc_mod implicit none - integer(psb_ipk_), intent(in) :: n + integer(psb_ipk_), intent(in) :: n class(psb_z_vect_type), intent(inout) :: x - integer(psb_ipk_), intent(out) :: info - - if (allocated(x%v)) & - & call x%v%asb(n,info) + integer(psb_ipk_), intent(out) :: info + if (allocated(x%v)) then + call x%v%asb(n,info) + end if end subroutine z_vect_asb subroutine z_vect_gthab(n,idx,alpha,x,beta,y) diff --git a/base/modules/tools/psb_c_tools_mod.F90 b/base/modules/tools/psb_c_tools_mod.F90 index 1aa2bcbb..3ccd5e69 100644 --- a/base/modules/tools/psb_c_tools_mod.F90 +++ b/base/modules/tools/psb_c_tools_mod.F90 @@ -67,13 +67,14 @@ Module psb_c_tools_mod interface psb_geasb - subroutine psb_casb_vect(x, desc_a, info,mold, scratch) + subroutine psb_casb_vect(x, desc_a, info,mold, dupl,scratch) import implicit none type(psb_desc_type), intent(in) :: desc_a type(psb_c_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_c_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch end subroutine psb_casb_vect subroutine psb_casb_vect_r2(x, desc_a, info,mold, scratch) @@ -262,14 +263,25 @@ Module psb_c_tools_mod end subroutine psb_cspasb end interface + interface psb_remote_vect + subroutine psb_c_remote_vect(v,desc_a, dupl, info) + import + implicit none + type(psb_c_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(in) :: dupl + integer(psb_ipk_), intent(out) :: info + end subroutine psb_c_remote_vect + end interface psb_remote_vect + interface psb_remote_mat subroutine psb_lc_remote_mat(a,desc_a,b, info) import implicit none - type(psb_lc_coo_sparse_mat),Intent(inout) :: a + type(psb_lc_coo_sparse_mat),Intent(inout) :: a type(psb_desc_type),intent(inout) :: desc_a - type(psb_lc_coo_sparse_mat),Intent(inout) :: b - integer(psb_ipk_), intent(out) :: info + type(psb_lc_coo_sparse_mat),Intent(inout) :: b + integer(psb_ipk_), intent(out) :: info end subroutine psb_lc_remote_mat end interface psb_remote_mat diff --git a/base/modules/tools/psb_d_tools_mod.F90 b/base/modules/tools/psb_d_tools_mod.F90 index b73fcb74..8756b874 100644 --- a/base/modules/tools/psb_d_tools_mod.F90 +++ b/base/modules/tools/psb_d_tools_mod.F90 @@ -67,13 +67,14 @@ Module psb_d_tools_mod interface psb_geasb - subroutine psb_dasb_vect(x, desc_a, info,mold, scratch) + subroutine psb_dasb_vect(x, desc_a, info,mold, dupl,scratch) import implicit none type(psb_desc_type), intent(in) :: desc_a type(psb_d_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_d_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch end subroutine psb_dasb_vect subroutine psb_dasb_vect_r2(x, desc_a, info,mold, scratch) @@ -262,14 +263,25 @@ Module psb_d_tools_mod end subroutine psb_dspasb end interface + interface psb_remote_vect + subroutine psb_d_remote_vect(v,desc_a, dupl, info) + import + implicit none + type(psb_d_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(in) :: dupl + integer(psb_ipk_), intent(out) :: info + end subroutine psb_d_remote_vect + end interface psb_remote_vect + interface psb_remote_mat subroutine psb_ld_remote_mat(a,desc_a,b, info) import implicit none - type(psb_ld_coo_sparse_mat),Intent(inout) :: a + type(psb_ld_coo_sparse_mat),Intent(inout) :: a type(psb_desc_type),intent(inout) :: desc_a - type(psb_ld_coo_sparse_mat),Intent(inout) :: b - integer(psb_ipk_), intent(out) :: info + type(psb_ld_coo_sparse_mat),Intent(inout) :: b + integer(psb_ipk_), intent(out) :: info end subroutine psb_ld_remote_mat end interface psb_remote_mat diff --git a/base/modules/tools/psb_i_tools_mod.F90 b/base/modules/tools/psb_i_tools_mod.F90 index 5cc6e836..ff4f1504 100644 --- a/base/modules/tools/psb_i_tools_mod.F90 +++ b/base/modules/tools/psb_i_tools_mod.F90 @@ -64,13 +64,14 @@ Module psb_i_tools_mod interface psb_geasb - subroutine psb_iasb_vect(x, desc_a, info,mold, scratch) + subroutine psb_iasb_vect(x, desc_a, info,mold, dupl,scratch) import implicit none type(psb_desc_type), intent(in) :: desc_a type(psb_i_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_i_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch end subroutine psb_iasb_vect subroutine psb_iasb_vect_r2(x, desc_a, info,mold, scratch) diff --git a/base/modules/tools/psb_l_tools_mod.F90 b/base/modules/tools/psb_l_tools_mod.F90 index 56617798..3b15ffd3 100644 --- a/base/modules/tools/psb_l_tools_mod.F90 +++ b/base/modules/tools/psb_l_tools_mod.F90 @@ -64,13 +64,14 @@ Module psb_l_tools_mod interface psb_geasb - subroutine psb_lasb_vect(x, desc_a, info,mold, scratch) + subroutine psb_lasb_vect(x, desc_a, info,mold, dupl,scratch) import implicit none type(psb_desc_type), intent(in) :: desc_a type(psb_l_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_l_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch end subroutine psb_lasb_vect subroutine psb_lasb_vect_r2(x, desc_a, info,mold, scratch) diff --git a/base/modules/tools/psb_s_tools_mod.F90 b/base/modules/tools/psb_s_tools_mod.F90 index dfa18e92..df22e689 100644 --- a/base/modules/tools/psb_s_tools_mod.F90 +++ b/base/modules/tools/psb_s_tools_mod.F90 @@ -67,13 +67,14 @@ Module psb_s_tools_mod interface psb_geasb - subroutine psb_sasb_vect(x, desc_a, info,mold, scratch) + subroutine psb_sasb_vect(x, desc_a, info,mold, dupl,scratch) import implicit none type(psb_desc_type), intent(in) :: desc_a type(psb_s_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_s_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch end subroutine psb_sasb_vect subroutine psb_sasb_vect_r2(x, desc_a, info,mold, scratch) @@ -262,14 +263,25 @@ Module psb_s_tools_mod end subroutine psb_sspasb end interface + interface psb_remote_vect + subroutine psb_s_remote_vect(v,desc_a, dupl, info) + import + implicit none + type(psb_s_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(in) :: dupl + integer(psb_ipk_), intent(out) :: info + end subroutine psb_s_remote_vect + end interface psb_remote_vect + interface psb_remote_mat subroutine psb_ls_remote_mat(a,desc_a,b, info) import implicit none - type(psb_ls_coo_sparse_mat),Intent(inout) :: a + type(psb_ls_coo_sparse_mat),Intent(inout) :: a type(psb_desc_type),intent(inout) :: desc_a - type(psb_ls_coo_sparse_mat),Intent(inout) :: b - integer(psb_ipk_), intent(out) :: info + type(psb_ls_coo_sparse_mat),Intent(inout) :: b + integer(psb_ipk_), intent(out) :: info end subroutine psb_ls_remote_mat end interface psb_remote_mat diff --git a/base/modules/tools/psb_z_tools_mod.F90 b/base/modules/tools/psb_z_tools_mod.F90 index b76893c7..8635b24b 100644 --- a/base/modules/tools/psb_z_tools_mod.F90 +++ b/base/modules/tools/psb_z_tools_mod.F90 @@ -67,13 +67,14 @@ Module psb_z_tools_mod interface psb_geasb - subroutine psb_zasb_vect(x, desc_a, info,mold, scratch) + subroutine psb_zasb_vect(x, desc_a, info,mold, dupl,scratch) import implicit none type(psb_desc_type), intent(in) :: desc_a type(psb_z_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_z_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch end subroutine psb_zasb_vect subroutine psb_zasb_vect_r2(x, desc_a, info,mold, scratch) @@ -262,14 +263,25 @@ Module psb_z_tools_mod end subroutine psb_zspasb end interface + interface psb_remote_vect + subroutine psb_z_remote_vect(v,desc_a, dupl, info) + import + implicit none + type(psb_z_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(in) :: dupl + integer(psb_ipk_), intent(out) :: info + end subroutine psb_z_remote_vect + end interface psb_remote_vect + interface psb_remote_mat subroutine psb_lz_remote_mat(a,desc_a,b, info) import implicit none - type(psb_lz_coo_sparse_mat),Intent(inout) :: a + type(psb_lz_coo_sparse_mat),Intent(inout) :: a type(psb_desc_type),intent(inout) :: desc_a - type(psb_lz_coo_sparse_mat),Intent(inout) :: b - integer(psb_ipk_), intent(out) :: info + type(psb_lz_coo_sparse_mat),Intent(inout) :: b + integer(psb_ipk_), intent(out) :: info end subroutine psb_lz_remote_mat end interface psb_remote_mat diff --git a/base/tools/psb_c_remote_mat.F90 b/base/tools/psb_c_remote_mat.F90 index fcccd26d..52a06480 100644 --- a/base/tools/psb_c_remote_mat.F90 +++ b/base/tools/psb_c_remote_mat.F90 @@ -29,9 +29,9 @@ ! POSSIBILITY OF SUCH DAMAGE. ! ! -! File: psb_csphalo.f90 +! File: psb_c_remote_mat.f90 ! -! Subroutine: psb_csphalo psb_lcsphalo +! Subroutine: ! This routine does the retrieval of remote matrix rows. ! Retrieval is done through GETROW, therefore it should work ! for any matrix format in A; as for the output, default is CSR. @@ -278,3 +278,221 @@ Subroutine psb_lc_remote_mat(a,desc_a,b,info) End Subroutine psb_lc_remote_mat + +subroutine psb_c_remote_vect(v,desc_a, info, dupl) + use psb_base_mod, psb_protect_name => psb_c_remote_vect + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_c_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_),optional, intent(in) :: dupl + + ! ...local scalars.... + type(psb_ctxt_type) :: ctxt + integer(psb_ipk_) :: np,me + integer(psb_ipk_) :: counter, proc, i, n_el_send,n_el_recv, & + & n_elem, j, ipx,mat_recv, idxs,idxr,& + & data_,totxch,nxs, nxr, ncg, dupl_ + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem + integer(psb_lpk_) :: nz,nouth + integer(psb_ipk_) :: nnp, nrcvs, nsnds + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:), sdsi(:), rvsi(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_spk_), allocatable :: valsnd(:) + integer(psb_ipk_), allocatable :: ladj(:), ila(:), iprc(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_c_remote_vect' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + if (present(dupl)) then + dupl_ = dupl + else + if (v%is_remote_build()) then + dupl_ = psb_dupl_add_ + else + dupl_ = psb_dupl_ovwrt_ + end if + endif + + ctxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ctxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + write(0,*) me, 'X_remote_vect implementation to be completed ' + +!!$ call b%free() +!!$ +!!$ Allocate(rvsz(np),sdsz(np),sdsi(np),rvsi(np),brvindx(np+1),& +!!$ & bsdindx(np+1), acoo,stat=info) +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_alloc_dealloc_ +!!$ call psb_errpush(info,name) +!!$ goto 9999 +!!$ end if +!!$ +!!$ +!!$ nz = a%get_nzeros() +!!$ allocate(ila(nz)) +!!$ !write(0,*) me,name,' size :',nz,size(ila) +!!$ call desc_a%g2l(a%ia(1:nz),ila(1:nz),info,owned=.false.) +!!$ nouth = count(ila(1:nz)<0) +!!$ !write(0,*) me,name,' Count out of halo :',nouth +!!$ call psb_max(ctxt,nouth) +!!$ if ((nouth/=0).and.(me==0)) & +!!$ & write(0,*) 'Warning: would require reinit of DESC_A' +!!$ +!!$ call psi_graph_fnd_owner(a%ia(1:nz),iprc,ladj,desc_a%indxmap,info) +!!$ call psb_msort_unique(ladj,nnp) +!!$ !write(0,*) me,name,' Processes:',ladj(1:nnp) +!!$ +!!$ icomm = desc_a%get_mpic() +!!$ sdsz(:)=0 +!!$ rvsz(:)=0 +!!$ sdsi(:)=0 +!!$ rvsi(:)=0 +!!$ ipx = 1 +!!$ brvindx(:) = 0 +!!$ bsdindx(:) = 0 +!!$ counter=1 +!!$ idx = 0 +!!$ idxs = 0 +!!$ idxr = 0 +!!$ do i=1,nz +!!$ if (iprc(i) >=0) then +!!$ sdsz(iprc(i)+1) = sdsz(iprc(i)+1) +1 +!!$ else +!!$ write(0,*)me,name,' Error from fnd_owner: ',iprc(i) +!!$ end if +!!$ end do +!!$ call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& +!!$ & rvsz,1,psb_mpi_mpk_,icomm,minfo) +!!$ if (minfo /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='mpi_alltoall') +!!$ goto 9999 +!!$ end if +!!$ !write(0,*)me,name,' sdsz ',sdsz(:),' rvsz:',rvsz(:) +!!$ nsnds = count(sdsz /= 0) +!!$ nrcvs = count(rvsz /= 0) +!!$ idxs = 0 +!!$ idxr = 0 +!!$ counter = 1 +!!$ Do proc=0,np-1 +!!$ bsdindx(proc+1) = idxs +!!$ idxs = idxs + sdsz(proc+1) +!!$ brvindx(proc+1) = idxr +!!$ idxr = idxr + rvsz(proc+1) +!!$ Enddo +!!$ +!!$ iszs = sum(sdsz) +!!$ iszr = sum(rvsz) +!!$ call acoo%allocate(desc_a%get_global_rows(),desc_a%get_global_cols(),iszr) +!!$ if (psb_errstatus_fatal()) then +!!$ write(0,*) 'Error from acoo%allocate ' +!!$ info = 4010 +!!$ goto 9999 +!!$ end if +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& +!!$ & ' Send:',sdsz(:),' Receive:',rvsz(:) +!!$ !write(debug_unit,*) me,' ',trim(name),': ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' iasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' jasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' valsnd: ',info +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='ensure_size') +!!$ goto 9999 +!!$ end if +!!$ do k=1, nz +!!$ proc = iprc(k) +!!$ sdsi(proc+1) = sdsi(proc+1) + 1 +!!$ !rvsi(proc) = rvsi(proc) + 1 +!!$ iasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ia(k) +!!$ jasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ja(k) +!!$ valsnd(bsdindx(proc+1)+sdsi(proc+1)) = a%val(k) +!!$ end do +!!$ do proc=0,np-1 +!!$ if (sdsi(proc+1) /= sdsz(proc+1)) & +!!$ & write(0,*) me,name,'Send mismacth ',sdsi(proc+1),sdsz(proc+1) +!!$ end do +!!$ +!!$ select case(psb_get_sp_a2av_alg()) +!!$ case(psb_sp_a2av_smpl_triad_) +!!$ call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& +!!$ & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_smpl_v_) +!!$ call psb_simple_a2av(valsnd,sdsz,bsdindx,& +!!$ & acoo%val,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& +!!$ & acoo%ia,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& +!!$ & acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_mpi_) +!!$ +!!$ call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_spk_,& +!!$ & acoo%val,rvsz,brvindx,psb_mpi_c_spk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo /= mpi_success) info = minfo +!!$ case default +!!$ info = psb_err_internal_error_ +!!$ call psb_errpush(info,name,a_err='wrong A2AV alg selector') +!!$ goto 9999 +!!$ end select +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='alltoallv') +!!$ goto 9999 +!!$ end if +!!$ call acoo%set_nzeros(iszr) +!!$ call acoo%mv_to_coo(b,info) +!!$ +!!$ Deallocate(brvindx,bsdindx,rvsz,sdsz,& +!!$ & iasnd,jasnd,valsnd,stat=info) +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ctxt,err_act) + + return + +End Subroutine psb_c_remote_vect + + diff --git a/base/tools/psb_casb.f90 b/base/tools/psb_casb.f90 index de2e3890..6eba0696 100644 --- a/base/tools/psb_casb.f90 +++ b/base/tools/psb_casb.f90 @@ -51,7 +51,7 @@ ! scratch - logical, optional If true, allocate without checking/zeroing contents. ! default: .false. ! -subroutine psb_casb_vect(x, desc_a, info, mold, scratch) +subroutine psb_casb_vect(x, desc_a, info, mold, dupl,scratch) use psb_base_mod, psb_protect_name => psb_casb_vect implicit none @@ -59,12 +59,13 @@ subroutine psb_casb_vect(x, desc_a, info, mold, scratch) type(psb_c_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_c_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch ! local variables type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np,me - integer(psb_ipk_) :: i1sz,nrow,ncol, err_act + integer(psb_ipk_) :: i1sz,nrow,ncol, err_act, dupl_ logical :: scratch_ integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name,ch_err @@ -82,6 +83,11 @@ subroutine psb_casb_vect(x, desc_a, info, mold, scratch) scratch_ = .false. if (present(scratch)) scratch_ = scratch + if (present(dupl)) then + dupl_ = dupl + else + dupl_ = psb_dupl_ovwrt_ + endif call psb_info(ctxt, me, np) ! ....verify blacs grid correctness.. @@ -104,6 +110,7 @@ subroutine psb_casb_vect(x, desc_a, info, mold, scratch) call x%free(info) call x%bld(ncol,mold=mold) else + if (x%is_remote_build()) call psb_c_remote_vect(x,desc_a,dupl_, info) call x%asb(ncol,info) ! ..update halo elements.. call psb_halo(x,desc_a,info) diff --git a/base/tools/psb_cins.f90 b/base/tools/psb_cins.f90 index e874c315..948a596a 100644 --- a/base/tools/psb_cins.f90 +++ b/base/tools/psb_cins.f90 @@ -112,7 +112,6 @@ subroutine psb_cins_vect(m, irw, val, x, desc_a, info, dupl,local) endif - allocate(irl(m),stat=info) if (info /= psb_success_) then info = psb_err_alloc_dealloc_ diff --git a/base/tools/psb_cspalloc.f90 b/base/tools/psb_cspalloc.f90 index 286bb4a0..5d5a1f34 100644 --- a/base/tools/psb_cspalloc.f90 +++ b/base/tools/psb_cspalloc.f90 @@ -116,22 +116,28 @@ subroutine psb_cspalloc(a, desc_a, info, nnz, bldmode) !!$ write(0,*) name,'Setting a%remote_build ',& !!$ & bldmode_,psb_matbld_noremote_,psb_matbld_remote_ - a%remote_build = bldmode_ - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - !write(0,*) name,' matbld_noremote_ nothing needed' - case (psb_matbld_remote_) - !write(0,*) name,' matbld_remote_ start ' + call a%set_remote_build(bldmode_) + if (a%is_remote_build()) then allocate(a%rmta) nnzrmt_ = max(100,(nnz_/100)) call a%rmta%allocate(m,n,nnzrmt_) - - case default - write(0,*) name,'Invalid value for remote_build ' - a%remote_build = psb_matbld_noremote_ - end select + end if + +!!$ a%remote_build = bldmode_ +!!$ select case(a%remote_build) +!!$ case (psb_matbld_noremote_) +!!$ ! nothing needed +!!$ !write(0,*) name,' matbld_noremote_ nothing needed' +!!$ case (psb_matbld_remote_) +!!$ !write(0,*) name,' matbld_remote_ start ' +!!$ allocate(a%rmta) +!!$ nnzrmt_ = max(100,(nnz_/100)) +!!$ call a%rmta%allocate(m,n,nnzrmt_) +!!$ +!!$ case default +!!$ write(0,*) name,'Invalid value for remote_build ' +!!$ a%remote_build = psb_matbld_noremote_ +!!$ end select if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': ', & diff --git a/base/tools/psb_cspasb.f90 b/base/tools/psb_cspasb.f90 index 390bdcaf..95f3cf37 100644 --- a/base/tools/psb_cspasb.f90 +++ b/base/tools/psb_cspasb.f90 @@ -106,11 +106,7 @@ subroutine psb_cspasb(a,desc_a, info, afmt, upd, dupl, mold) ! ! First case: we come from a fresh build. ! - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - case (psb_matbld_remote_) + if (a%is_remote_build()) then !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() block type(psb_lc_coo_sparse_mat) :: a_add @@ -143,11 +139,42 @@ subroutine psb_cspasb(a,desc_a, info, afmt, upd, dupl, mold) if (nzt > 0) call psb_cdasb(desc_a,info,mold=ivm) end block - end select - call a%set_ncols(desc_a%get_local_cols()) - + end if + call a%set_ncols(desc_a%get_local_cols()) call a%cscnv(info,type=afmt,dupl=dupl, mold=mold) else if (a%is_upd()) then + if (a%is_remote_build()) then + !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() + block + type(psb_lc_coo_sparse_mat) :: a_add + integer(psb_ipk_), allocatable :: ila(:), jla(:) + integer(psb_ipk_) :: nz, nzt,k + call psb_remote_mat(a%rmta,desc_a,a_add,info) + nz = a_add%get_nzeros() +!!$ write(0,*) me,name,' Nz to be added',nz + nzt = nz + call psb_sum(ctxt,nzt) + if (nzt>0) then + allocate(ivm, mold=desc_a%v_halo_index%v) + call psb_cd_reinit(desc_a, info) + end if + if (nz > 0) then + ! + ! Should we check for new indices here? + ! + call psb_realloc(nz,ila,info) + call psb_realloc(nz,jla,info) + call desc_a%indxmap%g2l(a_add%ia(1:nz),ila(1:nz),info,owned=.true.) + if (info == 0) call desc_a%indxmap%g2l_ins(a_add%ja(1:nz),jla(1:nz),info) + !write(0,*) me,name,' Check before insert',a%get_nzeros() + n_row = desc_a%get_local_rows() + n_col = desc_a%get_local_cols() + call a%set_ncols(desc_a%get_local_cols()) + call a%csput(nz,ila,jla,a_add%val,ione,n_row,ione,n_col,info) + !write(0,*) me,name,' Check after insert',a%get_nzeros(),nz + end if + end block + end if call a%asb(mold=mold) else info = psb_err_invalid_mat_state_ diff --git a/base/tools/psb_cspins.F90 b/base/tools/psb_cspins.F90 index 553e3e3d..27cfbd8e 100644 --- a/base/tools/psb_cspins.F90 +++ b/base/tools/psb_cspins.F90 @@ -151,11 +151,8 @@ subroutine psb_cspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -173,9 +170,7 @@ subroutine psb_cspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select + end if else info = psb_err_invalid_a_and_cd_state_ @@ -208,10 +203,7 @@ subroutine psb_cspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -229,10 +221,7 @@ subroutine psb_cspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select - + end if else info = psb_err_invalid_cd_state_ call psb_errpush(info,name) diff --git a/base/tools/psb_d_remote_mat.F90 b/base/tools/psb_d_remote_mat.F90 index 290a6f3d..8d67dfbb 100644 --- a/base/tools/psb_d_remote_mat.F90 +++ b/base/tools/psb_d_remote_mat.F90 @@ -29,9 +29,9 @@ ! POSSIBILITY OF SUCH DAMAGE. ! ! -! File: psb_dsphalo.f90 +! File: psb_d_remote_mat.f90 ! -! Subroutine: psb_dsphalo psb_ldsphalo +! Subroutine: ! This routine does the retrieval of remote matrix rows. ! Retrieval is done through GETROW, therefore it should work ! for any matrix format in A; as for the output, default is CSR. @@ -278,3 +278,221 @@ Subroutine psb_ld_remote_mat(a,desc_a,b,info) End Subroutine psb_ld_remote_mat + +subroutine psb_d_remote_vect(v,desc_a, info, dupl) + use psb_base_mod, psb_protect_name => psb_d_remote_vect + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_d_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_),optional, intent(in) :: dupl + + ! ...local scalars.... + type(psb_ctxt_type) :: ctxt + integer(psb_ipk_) :: np,me + integer(psb_ipk_) :: counter, proc, i, n_el_send,n_el_recv, & + & n_elem, j, ipx,mat_recv, idxs,idxr,& + & data_,totxch,nxs, nxr, ncg, dupl_ + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem + integer(psb_lpk_) :: nz,nouth + integer(psb_ipk_) :: nnp, nrcvs, nsnds + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:), sdsi(:), rvsi(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + real(psb_dpk_), allocatable :: valsnd(:) + integer(psb_ipk_), allocatable :: ladj(:), ila(:), iprc(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_d_remote_vect' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + if (present(dupl)) then + dupl_ = dupl + else + if (v%is_remote_build()) then + dupl_ = psb_dupl_add_ + else + dupl_ = psb_dupl_ovwrt_ + end if + endif + + ctxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ctxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + write(0,*) me, 'X_remote_vect implementation to be completed ' + +!!$ call b%free() +!!$ +!!$ Allocate(rvsz(np),sdsz(np),sdsi(np),rvsi(np),brvindx(np+1),& +!!$ & bsdindx(np+1), acoo,stat=info) +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_alloc_dealloc_ +!!$ call psb_errpush(info,name) +!!$ goto 9999 +!!$ end if +!!$ +!!$ +!!$ nz = a%get_nzeros() +!!$ allocate(ila(nz)) +!!$ !write(0,*) me,name,' size :',nz,size(ila) +!!$ call desc_a%g2l(a%ia(1:nz),ila(1:nz),info,owned=.false.) +!!$ nouth = count(ila(1:nz)<0) +!!$ !write(0,*) me,name,' Count out of halo :',nouth +!!$ call psb_max(ctxt,nouth) +!!$ if ((nouth/=0).and.(me==0)) & +!!$ & write(0,*) 'Warning: would require reinit of DESC_A' +!!$ +!!$ call psi_graph_fnd_owner(a%ia(1:nz),iprc,ladj,desc_a%indxmap,info) +!!$ call psb_msort_unique(ladj,nnp) +!!$ !write(0,*) me,name,' Processes:',ladj(1:nnp) +!!$ +!!$ icomm = desc_a%get_mpic() +!!$ sdsz(:)=0 +!!$ rvsz(:)=0 +!!$ sdsi(:)=0 +!!$ rvsi(:)=0 +!!$ ipx = 1 +!!$ brvindx(:) = 0 +!!$ bsdindx(:) = 0 +!!$ counter=1 +!!$ idx = 0 +!!$ idxs = 0 +!!$ idxr = 0 +!!$ do i=1,nz +!!$ if (iprc(i) >=0) then +!!$ sdsz(iprc(i)+1) = sdsz(iprc(i)+1) +1 +!!$ else +!!$ write(0,*)me,name,' Error from fnd_owner: ',iprc(i) +!!$ end if +!!$ end do +!!$ call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& +!!$ & rvsz,1,psb_mpi_mpk_,icomm,minfo) +!!$ if (minfo /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='mpi_alltoall') +!!$ goto 9999 +!!$ end if +!!$ !write(0,*)me,name,' sdsz ',sdsz(:),' rvsz:',rvsz(:) +!!$ nsnds = count(sdsz /= 0) +!!$ nrcvs = count(rvsz /= 0) +!!$ idxs = 0 +!!$ idxr = 0 +!!$ counter = 1 +!!$ Do proc=0,np-1 +!!$ bsdindx(proc+1) = idxs +!!$ idxs = idxs + sdsz(proc+1) +!!$ brvindx(proc+1) = idxr +!!$ idxr = idxr + rvsz(proc+1) +!!$ Enddo +!!$ +!!$ iszs = sum(sdsz) +!!$ iszr = sum(rvsz) +!!$ call acoo%allocate(desc_a%get_global_rows(),desc_a%get_global_cols(),iszr) +!!$ if (psb_errstatus_fatal()) then +!!$ write(0,*) 'Error from acoo%allocate ' +!!$ info = 4010 +!!$ goto 9999 +!!$ end if +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& +!!$ & ' Send:',sdsz(:),' Receive:',rvsz(:) +!!$ !write(debug_unit,*) me,' ',trim(name),': ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' iasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' jasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' valsnd: ',info +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='ensure_size') +!!$ goto 9999 +!!$ end if +!!$ do k=1, nz +!!$ proc = iprc(k) +!!$ sdsi(proc+1) = sdsi(proc+1) + 1 +!!$ !rvsi(proc) = rvsi(proc) + 1 +!!$ iasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ia(k) +!!$ jasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ja(k) +!!$ valsnd(bsdindx(proc+1)+sdsi(proc+1)) = a%val(k) +!!$ end do +!!$ do proc=0,np-1 +!!$ if (sdsi(proc+1) /= sdsz(proc+1)) & +!!$ & write(0,*) me,name,'Send mismacth ',sdsi(proc+1),sdsz(proc+1) +!!$ end do +!!$ +!!$ select case(psb_get_sp_a2av_alg()) +!!$ case(psb_sp_a2av_smpl_triad_) +!!$ call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& +!!$ & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_smpl_v_) +!!$ call psb_simple_a2av(valsnd,sdsz,bsdindx,& +!!$ & acoo%val,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& +!!$ & acoo%ia,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& +!!$ & acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_mpi_) +!!$ +!!$ call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_dpk_,& +!!$ & acoo%val,rvsz,brvindx,psb_mpi_r_dpk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo /= mpi_success) info = minfo +!!$ case default +!!$ info = psb_err_internal_error_ +!!$ call psb_errpush(info,name,a_err='wrong A2AV alg selector') +!!$ goto 9999 +!!$ end select +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='alltoallv') +!!$ goto 9999 +!!$ end if +!!$ call acoo%set_nzeros(iszr) +!!$ call acoo%mv_to_coo(b,info) +!!$ +!!$ Deallocate(brvindx,bsdindx,rvsz,sdsz,& +!!$ & iasnd,jasnd,valsnd,stat=info) +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ctxt,err_act) + + return + +End Subroutine psb_d_remote_vect + + diff --git a/base/tools/psb_dasb.f90 b/base/tools/psb_dasb.f90 index 5ebee093..34bd345b 100644 --- a/base/tools/psb_dasb.f90 +++ b/base/tools/psb_dasb.f90 @@ -51,7 +51,7 @@ ! scratch - logical, optional If true, allocate without checking/zeroing contents. ! default: .false. ! -subroutine psb_dasb_vect(x, desc_a, info, mold, scratch) +subroutine psb_dasb_vect(x, desc_a, info, mold, dupl,scratch) use psb_base_mod, psb_protect_name => psb_dasb_vect implicit none @@ -59,12 +59,13 @@ subroutine psb_dasb_vect(x, desc_a, info, mold, scratch) type(psb_d_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_d_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch ! local variables type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np,me - integer(psb_ipk_) :: i1sz,nrow,ncol, err_act + integer(psb_ipk_) :: i1sz,nrow,ncol, err_act, dupl_ logical :: scratch_ integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name,ch_err @@ -82,6 +83,11 @@ subroutine psb_dasb_vect(x, desc_a, info, mold, scratch) scratch_ = .false. if (present(scratch)) scratch_ = scratch + if (present(dupl)) then + dupl_ = dupl + else + dupl_ = psb_dupl_ovwrt_ + endif call psb_info(ctxt, me, np) ! ....verify blacs grid correctness.. @@ -104,6 +110,7 @@ subroutine psb_dasb_vect(x, desc_a, info, mold, scratch) call x%free(info) call x%bld(ncol,mold=mold) else + if (x%is_remote_build()) call psb_d_remote_vect(x,desc_a,dupl_, info) call x%asb(ncol,info) ! ..update halo elements.. call psb_halo(x,desc_a,info) diff --git a/base/tools/psb_dins.f90 b/base/tools/psb_dins.f90 index 3e873ded..5aaceec1 100644 --- a/base/tools/psb_dins.f90 +++ b/base/tools/psb_dins.f90 @@ -112,7 +112,6 @@ subroutine psb_dins_vect(m, irw, val, x, desc_a, info, dupl,local) endif - allocate(irl(m),stat=info) if (info /= psb_success_) then info = psb_err_alloc_dealloc_ diff --git a/base/tools/psb_dspalloc.f90 b/base/tools/psb_dspalloc.f90 index 781a3abf..31381bca 100644 --- a/base/tools/psb_dspalloc.f90 +++ b/base/tools/psb_dspalloc.f90 @@ -116,22 +116,28 @@ subroutine psb_dspalloc(a, desc_a, info, nnz, bldmode) !!$ write(0,*) name,'Setting a%remote_build ',& !!$ & bldmode_,psb_matbld_noremote_,psb_matbld_remote_ - a%remote_build = bldmode_ - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - !write(0,*) name,' matbld_noremote_ nothing needed' - case (psb_matbld_remote_) - !write(0,*) name,' matbld_remote_ start ' + call a%set_remote_build(bldmode_) + if (a%is_remote_build()) then allocate(a%rmta) nnzrmt_ = max(100,(nnz_/100)) call a%rmta%allocate(m,n,nnzrmt_) - - case default - write(0,*) name,'Invalid value for remote_build ' - a%remote_build = psb_matbld_noremote_ - end select + end if + +!!$ a%remote_build = bldmode_ +!!$ select case(a%remote_build) +!!$ case (psb_matbld_noremote_) +!!$ ! nothing needed +!!$ !write(0,*) name,' matbld_noremote_ nothing needed' +!!$ case (psb_matbld_remote_) +!!$ !write(0,*) name,' matbld_remote_ start ' +!!$ allocate(a%rmta) +!!$ nnzrmt_ = max(100,(nnz_/100)) +!!$ call a%rmta%allocate(m,n,nnzrmt_) +!!$ +!!$ case default +!!$ write(0,*) name,'Invalid value for remote_build ' +!!$ a%remote_build = psb_matbld_noremote_ +!!$ end select if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': ', & diff --git a/base/tools/psb_dspasb.f90 b/base/tools/psb_dspasb.f90 index d82f5efe..fdf1d16c 100644 --- a/base/tools/psb_dspasb.f90 +++ b/base/tools/psb_dspasb.f90 @@ -106,11 +106,7 @@ subroutine psb_dspasb(a,desc_a, info, afmt, upd, dupl, mold) ! ! First case: we come from a fresh build. ! - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - case (psb_matbld_remote_) + if (a%is_remote_build()) then !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() block type(psb_ld_coo_sparse_mat) :: a_add @@ -143,11 +139,42 @@ subroutine psb_dspasb(a,desc_a, info, afmt, upd, dupl, mold) if (nzt > 0) call psb_cdasb(desc_a,info,mold=ivm) end block - end select - call a%set_ncols(desc_a%get_local_cols()) - + end if + call a%set_ncols(desc_a%get_local_cols()) call a%cscnv(info,type=afmt,dupl=dupl, mold=mold) else if (a%is_upd()) then + if (a%is_remote_build()) then + !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() + block + type(psb_ld_coo_sparse_mat) :: a_add + integer(psb_ipk_), allocatable :: ila(:), jla(:) + integer(psb_ipk_) :: nz, nzt,k + call psb_remote_mat(a%rmta,desc_a,a_add,info) + nz = a_add%get_nzeros() +!!$ write(0,*) me,name,' Nz to be added',nz + nzt = nz + call psb_sum(ctxt,nzt) + if (nzt>0) then + allocate(ivm, mold=desc_a%v_halo_index%v) + call psb_cd_reinit(desc_a, info) + end if + if (nz > 0) then + ! + ! Should we check for new indices here? + ! + call psb_realloc(nz,ila,info) + call psb_realloc(nz,jla,info) + call desc_a%indxmap%g2l(a_add%ia(1:nz),ila(1:nz),info,owned=.true.) + if (info == 0) call desc_a%indxmap%g2l_ins(a_add%ja(1:nz),jla(1:nz),info) + !write(0,*) me,name,' Check before insert',a%get_nzeros() + n_row = desc_a%get_local_rows() + n_col = desc_a%get_local_cols() + call a%set_ncols(desc_a%get_local_cols()) + call a%csput(nz,ila,jla,a_add%val,ione,n_row,ione,n_col,info) + !write(0,*) me,name,' Check after insert',a%get_nzeros(),nz + end if + end block + end if call a%asb(mold=mold) else info = psb_err_invalid_mat_state_ diff --git a/base/tools/psb_dspins.F90 b/base/tools/psb_dspins.F90 index 064b816c..2a70ab83 100644 --- a/base/tools/psb_dspins.F90 +++ b/base/tools/psb_dspins.F90 @@ -151,11 +151,8 @@ subroutine psb_dspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -173,9 +170,7 @@ subroutine psb_dspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select + end if else info = psb_err_invalid_a_and_cd_state_ @@ -208,10 +203,7 @@ subroutine psb_dspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -229,10 +221,7 @@ subroutine psb_dspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select - + end if else info = psb_err_invalid_cd_state_ call psb_errpush(info,name) diff --git a/base/tools/psb_iasb.f90 b/base/tools/psb_iasb.f90 index d0cf2d83..8c3db6d6 100644 --- a/base/tools/psb_iasb.f90 +++ b/base/tools/psb_iasb.f90 @@ -51,7 +51,7 @@ ! scratch - logical, optional If true, allocate without checking/zeroing contents. ! default: .false. ! -subroutine psb_iasb_vect(x, desc_a, info, mold, scratch) +subroutine psb_iasb_vect(x, desc_a, info, mold, dupl,scratch) use psb_base_mod, psb_protect_name => psb_iasb_vect implicit none @@ -59,12 +59,13 @@ subroutine psb_iasb_vect(x, desc_a, info, mold, scratch) type(psb_i_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_i_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch ! local variables type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np,me - integer(psb_ipk_) :: i1sz,nrow,ncol, err_act + integer(psb_ipk_) :: i1sz,nrow,ncol, err_act, dupl_ logical :: scratch_ integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name,ch_err @@ -82,6 +83,11 @@ subroutine psb_iasb_vect(x, desc_a, info, mold, scratch) scratch_ = .false. if (present(scratch)) scratch_ = scratch + if (present(dupl)) then + dupl_ = dupl + else + dupl_ = psb_dupl_ovwrt_ + endif call psb_info(ctxt, me, np) ! ....verify blacs grid correctness.. @@ -104,6 +110,7 @@ subroutine psb_iasb_vect(x, desc_a, info, mold, scratch) call x%free(info) call x%bld(ncol,mold=mold) else + if (x%is_remote_build()) call psb_i_remote_vect(x,desc_a,dupl_, info) call x%asb(ncol,info) ! ..update halo elements.. call psb_halo(x,desc_a,info) diff --git a/base/tools/psb_iins.f90 b/base/tools/psb_iins.f90 index c9c0ed9b..3f72494e 100644 --- a/base/tools/psb_iins.f90 +++ b/base/tools/psb_iins.f90 @@ -112,7 +112,6 @@ subroutine psb_iins_vect(m, irw, val, x, desc_a, info, dupl,local) endif - allocate(irl(m),stat=info) if (info /= psb_success_) then info = psb_err_alloc_dealloc_ diff --git a/base/tools/psb_lasb.f90 b/base/tools/psb_lasb.f90 index 1618abdb..529dbe44 100644 --- a/base/tools/psb_lasb.f90 +++ b/base/tools/psb_lasb.f90 @@ -51,7 +51,7 @@ ! scratch - logical, optional If true, allocate without checking/zeroing contents. ! default: .false. ! -subroutine psb_lasb_vect(x, desc_a, info, mold, scratch) +subroutine psb_lasb_vect(x, desc_a, info, mold, dupl,scratch) use psb_base_mod, psb_protect_name => psb_lasb_vect implicit none @@ -59,12 +59,13 @@ subroutine psb_lasb_vect(x, desc_a, info, mold, scratch) type(psb_l_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_l_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch ! local variables type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np,me - integer(psb_ipk_) :: i1sz,nrow,ncol, err_act + integer(psb_ipk_) :: i1sz,nrow,ncol, err_act, dupl_ logical :: scratch_ integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name,ch_err @@ -82,6 +83,11 @@ subroutine psb_lasb_vect(x, desc_a, info, mold, scratch) scratch_ = .false. if (present(scratch)) scratch_ = scratch + if (present(dupl)) then + dupl_ = dupl + else + dupl_ = psb_dupl_ovwrt_ + endif call psb_info(ctxt, me, np) ! ....verify blacs grid correctness.. @@ -104,6 +110,7 @@ subroutine psb_lasb_vect(x, desc_a, info, mold, scratch) call x%free(info) call x%bld(ncol,mold=mold) else + if (x%is_remote_build()) call psb_l_remote_vect(x,desc_a,dupl_, info) call x%asb(ncol,info) ! ..update halo elements.. call psb_halo(x,desc_a,info) diff --git a/base/tools/psb_lins.f90 b/base/tools/psb_lins.f90 index 42559a94..27478f0a 100644 --- a/base/tools/psb_lins.f90 +++ b/base/tools/psb_lins.f90 @@ -112,7 +112,6 @@ subroutine psb_lins_vect(m, irw, val, x, desc_a, info, dupl,local) endif - allocate(irl(m),stat=info) if (info /= psb_success_) then info = psb_err_alloc_dealloc_ diff --git a/base/tools/psb_s_remote_mat.F90 b/base/tools/psb_s_remote_mat.F90 index 26b12652..0c9e0307 100644 --- a/base/tools/psb_s_remote_mat.F90 +++ b/base/tools/psb_s_remote_mat.F90 @@ -29,9 +29,9 @@ ! POSSIBILITY OF SUCH DAMAGE. ! ! -! File: psb_ssphalo.f90 +! File: psb_s_remote_mat.f90 ! -! Subroutine: psb_ssphalo psb_lssphalo +! Subroutine: ! This routine does the retrieval of remote matrix rows. ! Retrieval is done through GETROW, therefore it should work ! for any matrix format in A; as for the output, default is CSR. @@ -278,3 +278,221 @@ Subroutine psb_ls_remote_mat(a,desc_a,b,info) End Subroutine psb_ls_remote_mat + +subroutine psb_s_remote_vect(v,desc_a, info, dupl) + use psb_base_mod, psb_protect_name => psb_s_remote_vect + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_s_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_),optional, intent(in) :: dupl + + ! ...local scalars.... + type(psb_ctxt_type) :: ctxt + integer(psb_ipk_) :: np,me + integer(psb_ipk_) :: counter, proc, i, n_el_send,n_el_recv, & + & n_elem, j, ipx,mat_recv, idxs,idxr,& + & data_,totxch,nxs, nxr, ncg, dupl_ + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem + integer(psb_lpk_) :: nz,nouth + integer(psb_ipk_) :: nnp, nrcvs, nsnds + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:), sdsi(:), rvsi(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + real(psb_spk_), allocatable :: valsnd(:) + integer(psb_ipk_), allocatable :: ladj(:), ila(:), iprc(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_s_remote_vect' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + if (present(dupl)) then + dupl_ = dupl + else + if (v%is_remote_build()) then + dupl_ = psb_dupl_add_ + else + dupl_ = psb_dupl_ovwrt_ + end if + endif + + ctxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ctxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + write(0,*) me, 'X_remote_vect implementation to be completed ' + +!!$ call b%free() +!!$ +!!$ Allocate(rvsz(np),sdsz(np),sdsi(np),rvsi(np),brvindx(np+1),& +!!$ & bsdindx(np+1), acoo,stat=info) +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_alloc_dealloc_ +!!$ call psb_errpush(info,name) +!!$ goto 9999 +!!$ end if +!!$ +!!$ +!!$ nz = a%get_nzeros() +!!$ allocate(ila(nz)) +!!$ !write(0,*) me,name,' size :',nz,size(ila) +!!$ call desc_a%g2l(a%ia(1:nz),ila(1:nz),info,owned=.false.) +!!$ nouth = count(ila(1:nz)<0) +!!$ !write(0,*) me,name,' Count out of halo :',nouth +!!$ call psb_max(ctxt,nouth) +!!$ if ((nouth/=0).and.(me==0)) & +!!$ & write(0,*) 'Warning: would require reinit of DESC_A' +!!$ +!!$ call psi_graph_fnd_owner(a%ia(1:nz),iprc,ladj,desc_a%indxmap,info) +!!$ call psb_msort_unique(ladj,nnp) +!!$ !write(0,*) me,name,' Processes:',ladj(1:nnp) +!!$ +!!$ icomm = desc_a%get_mpic() +!!$ sdsz(:)=0 +!!$ rvsz(:)=0 +!!$ sdsi(:)=0 +!!$ rvsi(:)=0 +!!$ ipx = 1 +!!$ brvindx(:) = 0 +!!$ bsdindx(:) = 0 +!!$ counter=1 +!!$ idx = 0 +!!$ idxs = 0 +!!$ idxr = 0 +!!$ do i=1,nz +!!$ if (iprc(i) >=0) then +!!$ sdsz(iprc(i)+1) = sdsz(iprc(i)+1) +1 +!!$ else +!!$ write(0,*)me,name,' Error from fnd_owner: ',iprc(i) +!!$ end if +!!$ end do +!!$ call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& +!!$ & rvsz,1,psb_mpi_mpk_,icomm,minfo) +!!$ if (minfo /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='mpi_alltoall') +!!$ goto 9999 +!!$ end if +!!$ !write(0,*)me,name,' sdsz ',sdsz(:),' rvsz:',rvsz(:) +!!$ nsnds = count(sdsz /= 0) +!!$ nrcvs = count(rvsz /= 0) +!!$ idxs = 0 +!!$ idxr = 0 +!!$ counter = 1 +!!$ Do proc=0,np-1 +!!$ bsdindx(proc+1) = idxs +!!$ idxs = idxs + sdsz(proc+1) +!!$ brvindx(proc+1) = idxr +!!$ idxr = idxr + rvsz(proc+1) +!!$ Enddo +!!$ +!!$ iszs = sum(sdsz) +!!$ iszr = sum(rvsz) +!!$ call acoo%allocate(desc_a%get_global_rows(),desc_a%get_global_cols(),iszr) +!!$ if (psb_errstatus_fatal()) then +!!$ write(0,*) 'Error from acoo%allocate ' +!!$ info = 4010 +!!$ goto 9999 +!!$ end if +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& +!!$ & ' Send:',sdsz(:),' Receive:',rvsz(:) +!!$ !write(debug_unit,*) me,' ',trim(name),': ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' iasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' jasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' valsnd: ',info +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='ensure_size') +!!$ goto 9999 +!!$ end if +!!$ do k=1, nz +!!$ proc = iprc(k) +!!$ sdsi(proc+1) = sdsi(proc+1) + 1 +!!$ !rvsi(proc) = rvsi(proc) + 1 +!!$ iasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ia(k) +!!$ jasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ja(k) +!!$ valsnd(bsdindx(proc+1)+sdsi(proc+1)) = a%val(k) +!!$ end do +!!$ do proc=0,np-1 +!!$ if (sdsi(proc+1) /= sdsz(proc+1)) & +!!$ & write(0,*) me,name,'Send mismacth ',sdsi(proc+1),sdsz(proc+1) +!!$ end do +!!$ +!!$ select case(psb_get_sp_a2av_alg()) +!!$ case(psb_sp_a2av_smpl_triad_) +!!$ call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& +!!$ & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_smpl_v_) +!!$ call psb_simple_a2av(valsnd,sdsz,bsdindx,& +!!$ & acoo%val,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& +!!$ & acoo%ia,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& +!!$ & acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_mpi_) +!!$ +!!$ call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_spk_,& +!!$ & acoo%val,rvsz,brvindx,psb_mpi_r_spk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo /= mpi_success) info = minfo +!!$ case default +!!$ info = psb_err_internal_error_ +!!$ call psb_errpush(info,name,a_err='wrong A2AV alg selector') +!!$ goto 9999 +!!$ end select +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='alltoallv') +!!$ goto 9999 +!!$ end if +!!$ call acoo%set_nzeros(iszr) +!!$ call acoo%mv_to_coo(b,info) +!!$ +!!$ Deallocate(brvindx,bsdindx,rvsz,sdsz,& +!!$ & iasnd,jasnd,valsnd,stat=info) +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ctxt,err_act) + + return + +End Subroutine psb_s_remote_vect + + diff --git a/base/tools/psb_sasb.f90 b/base/tools/psb_sasb.f90 index ac3a0684..b13f095a 100644 --- a/base/tools/psb_sasb.f90 +++ b/base/tools/psb_sasb.f90 @@ -51,7 +51,7 @@ ! scratch - logical, optional If true, allocate without checking/zeroing contents. ! default: .false. ! -subroutine psb_sasb_vect(x, desc_a, info, mold, scratch) +subroutine psb_sasb_vect(x, desc_a, info, mold, dupl,scratch) use psb_base_mod, psb_protect_name => psb_sasb_vect implicit none @@ -59,12 +59,13 @@ subroutine psb_sasb_vect(x, desc_a, info, mold, scratch) type(psb_s_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_s_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch ! local variables type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np,me - integer(psb_ipk_) :: i1sz,nrow,ncol, err_act + integer(psb_ipk_) :: i1sz,nrow,ncol, err_act, dupl_ logical :: scratch_ integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name,ch_err @@ -82,6 +83,11 @@ subroutine psb_sasb_vect(x, desc_a, info, mold, scratch) scratch_ = .false. if (present(scratch)) scratch_ = scratch + if (present(dupl)) then + dupl_ = dupl + else + dupl_ = psb_dupl_ovwrt_ + endif call psb_info(ctxt, me, np) ! ....verify blacs grid correctness.. @@ -104,6 +110,7 @@ subroutine psb_sasb_vect(x, desc_a, info, mold, scratch) call x%free(info) call x%bld(ncol,mold=mold) else + if (x%is_remote_build()) call psb_s_remote_vect(x,desc_a,dupl_, info) call x%asb(ncol,info) ! ..update halo elements.. call psb_halo(x,desc_a,info) diff --git a/base/tools/psb_sins.f90 b/base/tools/psb_sins.f90 index cb878c64..2ef44c85 100644 --- a/base/tools/psb_sins.f90 +++ b/base/tools/psb_sins.f90 @@ -112,7 +112,6 @@ subroutine psb_sins_vect(m, irw, val, x, desc_a, info, dupl,local) endif - allocate(irl(m),stat=info) if (info /= psb_success_) then info = psb_err_alloc_dealloc_ diff --git a/base/tools/psb_sspalloc.f90 b/base/tools/psb_sspalloc.f90 index 14e784da..bcaee92b 100644 --- a/base/tools/psb_sspalloc.f90 +++ b/base/tools/psb_sspalloc.f90 @@ -116,22 +116,28 @@ subroutine psb_sspalloc(a, desc_a, info, nnz, bldmode) !!$ write(0,*) name,'Setting a%remote_build ',& !!$ & bldmode_,psb_matbld_noremote_,psb_matbld_remote_ - a%remote_build = bldmode_ - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - !write(0,*) name,' matbld_noremote_ nothing needed' - case (psb_matbld_remote_) - !write(0,*) name,' matbld_remote_ start ' + call a%set_remote_build(bldmode_) + if (a%is_remote_build()) then allocate(a%rmta) nnzrmt_ = max(100,(nnz_/100)) call a%rmta%allocate(m,n,nnzrmt_) - - case default - write(0,*) name,'Invalid value for remote_build ' - a%remote_build = psb_matbld_noremote_ - end select + end if + +!!$ a%remote_build = bldmode_ +!!$ select case(a%remote_build) +!!$ case (psb_matbld_noremote_) +!!$ ! nothing needed +!!$ !write(0,*) name,' matbld_noremote_ nothing needed' +!!$ case (psb_matbld_remote_) +!!$ !write(0,*) name,' matbld_remote_ start ' +!!$ allocate(a%rmta) +!!$ nnzrmt_ = max(100,(nnz_/100)) +!!$ call a%rmta%allocate(m,n,nnzrmt_) +!!$ +!!$ case default +!!$ write(0,*) name,'Invalid value for remote_build ' +!!$ a%remote_build = psb_matbld_noremote_ +!!$ end select if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': ', & diff --git a/base/tools/psb_sspasb.f90 b/base/tools/psb_sspasb.f90 index 87372d01..cd2cdfa9 100644 --- a/base/tools/psb_sspasb.f90 +++ b/base/tools/psb_sspasb.f90 @@ -106,11 +106,7 @@ subroutine psb_sspasb(a,desc_a, info, afmt, upd, dupl, mold) ! ! First case: we come from a fresh build. ! - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - case (psb_matbld_remote_) + if (a%is_remote_build()) then !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() block type(psb_ls_coo_sparse_mat) :: a_add @@ -143,11 +139,42 @@ subroutine psb_sspasb(a,desc_a, info, afmt, upd, dupl, mold) if (nzt > 0) call psb_cdasb(desc_a,info,mold=ivm) end block - end select - call a%set_ncols(desc_a%get_local_cols()) - + end if + call a%set_ncols(desc_a%get_local_cols()) call a%cscnv(info,type=afmt,dupl=dupl, mold=mold) else if (a%is_upd()) then + if (a%is_remote_build()) then + !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() + block + type(psb_ls_coo_sparse_mat) :: a_add + integer(psb_ipk_), allocatable :: ila(:), jla(:) + integer(psb_ipk_) :: nz, nzt,k + call psb_remote_mat(a%rmta,desc_a,a_add,info) + nz = a_add%get_nzeros() +!!$ write(0,*) me,name,' Nz to be added',nz + nzt = nz + call psb_sum(ctxt,nzt) + if (nzt>0) then + allocate(ivm, mold=desc_a%v_halo_index%v) + call psb_cd_reinit(desc_a, info) + end if + if (nz > 0) then + ! + ! Should we check for new indices here? + ! + call psb_realloc(nz,ila,info) + call psb_realloc(nz,jla,info) + call desc_a%indxmap%g2l(a_add%ia(1:nz),ila(1:nz),info,owned=.true.) + if (info == 0) call desc_a%indxmap%g2l_ins(a_add%ja(1:nz),jla(1:nz),info) + !write(0,*) me,name,' Check before insert',a%get_nzeros() + n_row = desc_a%get_local_rows() + n_col = desc_a%get_local_cols() + call a%set_ncols(desc_a%get_local_cols()) + call a%csput(nz,ila,jla,a_add%val,ione,n_row,ione,n_col,info) + !write(0,*) me,name,' Check after insert',a%get_nzeros(),nz + end if + end block + end if call a%asb(mold=mold) else info = psb_err_invalid_mat_state_ diff --git a/base/tools/psb_sspins.F90 b/base/tools/psb_sspins.F90 index e49e7423..aee7a900 100644 --- a/base/tools/psb_sspins.F90 +++ b/base/tools/psb_sspins.F90 @@ -151,11 +151,8 @@ subroutine psb_sspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -173,9 +170,7 @@ subroutine psb_sspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select + end if else info = psb_err_invalid_a_and_cd_state_ @@ -208,10 +203,7 @@ subroutine psb_sspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -229,10 +221,7 @@ subroutine psb_sspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select - + end if else info = psb_err_invalid_cd_state_ call psb_errpush(info,name) diff --git a/base/tools/psb_z_remote_mat.F90 b/base/tools/psb_z_remote_mat.F90 index 7b2ade7c..2f5c05a7 100644 --- a/base/tools/psb_z_remote_mat.F90 +++ b/base/tools/psb_z_remote_mat.F90 @@ -29,9 +29,9 @@ ! POSSIBILITY OF SUCH DAMAGE. ! ! -! File: psb_zsphalo.f90 +! File: psb_z_remote_mat.f90 ! -! Subroutine: psb_zsphalo psb_lzsphalo +! Subroutine: ! This routine does the retrieval of remote matrix rows. ! Retrieval is done through GETROW, therefore it should work ! for any matrix format in A; as for the output, default is CSR. @@ -278,3 +278,221 @@ Subroutine psb_lz_remote_mat(a,desc_a,b,info) End Subroutine psb_lz_remote_mat + +subroutine psb_z_remote_vect(v,desc_a, info, dupl) + use psb_base_mod, psb_protect_name => psb_z_remote_vect + +#ifdef MPI_MOD + use mpi +#endif + Implicit None +#ifdef MPI_H + include 'mpif.h' +#endif + + type(psb_z_vect_type),Intent(inout) :: v + type(psb_desc_type),intent(in) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_),optional, intent(in) :: dupl + + ! ...local scalars.... + type(psb_ctxt_type) :: ctxt + integer(psb_ipk_) :: np,me + integer(psb_ipk_) :: counter, proc, i, n_el_send,n_el_recv, & + & n_elem, j, ipx,mat_recv, idxs,idxr,& + & data_,totxch,nxs, nxr, ncg, dupl_ + integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem + integer(psb_lpk_) :: nz,nouth + integer(psb_ipk_) :: nnp, nrcvs, nsnds + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:),sdsz(:), sdsi(:), rvsi(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:) + complex(psb_dpk_), allocatable :: valsnd(:) + integer(psb_ipk_), allocatable :: ladj(:), ila(:), iprc(:) + logical :: rowcnv_,colcnv_,rowscale_,colscale_ + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit, err_act + character(len=20) :: name, ch_err + + info=psb_success_ + name='psb_z_remote_vect' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + if (present(dupl)) then + dupl_ = dupl + else + if (v%is_remote_build()) then + dupl_ = psb_dupl_add_ + else + dupl_ = psb_dupl_ovwrt_ + end if + endif + + ctxt = desc_a%get_context() + icomm = desc_a%get_mpic() + + Call psb_info(ctxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + write(0,*) me, 'X_remote_vect implementation to be completed ' + +!!$ call b%free() +!!$ +!!$ Allocate(rvsz(np),sdsz(np),sdsi(np),rvsi(np),brvindx(np+1),& +!!$ & bsdindx(np+1), acoo,stat=info) +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_alloc_dealloc_ +!!$ call psb_errpush(info,name) +!!$ goto 9999 +!!$ end if +!!$ +!!$ +!!$ nz = a%get_nzeros() +!!$ allocate(ila(nz)) +!!$ !write(0,*) me,name,' size :',nz,size(ila) +!!$ call desc_a%g2l(a%ia(1:nz),ila(1:nz),info,owned=.false.) +!!$ nouth = count(ila(1:nz)<0) +!!$ !write(0,*) me,name,' Count out of halo :',nouth +!!$ call psb_max(ctxt,nouth) +!!$ if ((nouth/=0).and.(me==0)) & +!!$ & write(0,*) 'Warning: would require reinit of DESC_A' +!!$ +!!$ call psi_graph_fnd_owner(a%ia(1:nz),iprc,ladj,desc_a%indxmap,info) +!!$ call psb_msort_unique(ladj,nnp) +!!$ !write(0,*) me,name,' Processes:',ladj(1:nnp) +!!$ +!!$ icomm = desc_a%get_mpic() +!!$ sdsz(:)=0 +!!$ rvsz(:)=0 +!!$ sdsi(:)=0 +!!$ rvsi(:)=0 +!!$ ipx = 1 +!!$ brvindx(:) = 0 +!!$ bsdindx(:) = 0 +!!$ counter=1 +!!$ idx = 0 +!!$ idxs = 0 +!!$ idxr = 0 +!!$ do i=1,nz +!!$ if (iprc(i) >=0) then +!!$ sdsz(iprc(i)+1) = sdsz(iprc(i)+1) +1 +!!$ else +!!$ write(0,*)me,name,' Error from fnd_owner: ',iprc(i) +!!$ end if +!!$ end do +!!$ call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& +!!$ & rvsz,1,psb_mpi_mpk_,icomm,minfo) +!!$ if (minfo /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='mpi_alltoall') +!!$ goto 9999 +!!$ end if +!!$ !write(0,*)me,name,' sdsz ',sdsz(:),' rvsz:',rvsz(:) +!!$ nsnds = count(sdsz /= 0) +!!$ nrcvs = count(rvsz /= 0) +!!$ idxs = 0 +!!$ idxr = 0 +!!$ counter = 1 +!!$ Do proc=0,np-1 +!!$ bsdindx(proc+1) = idxs +!!$ idxs = idxs + sdsz(proc+1) +!!$ brvindx(proc+1) = idxr +!!$ idxr = idxr + rvsz(proc+1) +!!$ Enddo +!!$ +!!$ iszs = sum(sdsz) +!!$ iszr = sum(rvsz) +!!$ call acoo%allocate(desc_a%get_global_rows(),desc_a%get_global_cols(),iszr) +!!$ if (psb_errstatus_fatal()) then +!!$ write(0,*) 'Error from acoo%allocate ' +!!$ info = 4010 +!!$ goto 9999 +!!$ end if +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& +!!$ & ' Send:',sdsz(:),' Receive:',rvsz(:) +!!$ !write(debug_unit,*) me,' ',trim(name),': ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' iasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' jasnd: ',info +!!$ if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) +!!$ !write(debug_unit,*) me,' ',trim(name),' valsnd: ',info +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='ensure_size') +!!$ goto 9999 +!!$ end if +!!$ do k=1, nz +!!$ proc = iprc(k) +!!$ sdsi(proc+1) = sdsi(proc+1) + 1 +!!$ !rvsi(proc) = rvsi(proc) + 1 +!!$ iasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ia(k) +!!$ jasnd(bsdindx(proc+1)+sdsi(proc+1)) = a%ja(k) +!!$ valsnd(bsdindx(proc+1)+sdsi(proc+1)) = a%val(k) +!!$ end do +!!$ do proc=0,np-1 +!!$ if (sdsi(proc+1) /= sdsz(proc+1)) & +!!$ & write(0,*) me,name,'Send mismacth ',sdsi(proc+1),sdsz(proc+1) +!!$ end do +!!$ +!!$ select case(psb_get_sp_a2av_alg()) +!!$ case(psb_sp_a2av_smpl_triad_) +!!$ call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& +!!$ & acoo%val,acoo%ia,acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_smpl_v_) +!!$ call psb_simple_a2av(valsnd,sdsz,bsdindx,& +!!$ & acoo%val,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& +!!$ & acoo%ia,rvsz,brvindx,ctxt,info) +!!$ if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& +!!$ & acoo%ja,rvsz,brvindx,ctxt,info) +!!$ case(psb_sp_a2av_mpi_) +!!$ +!!$ call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_dpk_,& +!!$ & acoo%val,rvsz,brvindx,psb_mpi_c_dpk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ia,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo == mpi_success) & +!!$ & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& +!!$ & acoo%ja,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) +!!$ if (minfo /= mpi_success) info = minfo +!!$ case default +!!$ info = psb_err_internal_error_ +!!$ call psb_errpush(info,name,a_err='wrong A2AV alg selector') +!!$ goto 9999 +!!$ end select +!!$ +!!$ if (info /= psb_success_) then +!!$ info=psb_err_from_subroutine_ +!!$ call psb_errpush(info,name,a_err='alltoallv') +!!$ goto 9999 +!!$ end if +!!$ call acoo%set_nzeros(iszr) +!!$ call acoo%mv_to_coo(b,info) +!!$ +!!$ Deallocate(brvindx,bsdindx,rvsz,sdsz,& +!!$ & iasnd,jasnd,valsnd,stat=info) +!!$ if (debug_level >= psb_debug_outer_)& +!!$ & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ctxt,err_act) + + return + +End Subroutine psb_z_remote_vect + + diff --git a/base/tools/psb_zasb.f90 b/base/tools/psb_zasb.f90 index 34706841..050558ac 100644 --- a/base/tools/psb_zasb.f90 +++ b/base/tools/psb_zasb.f90 @@ -51,7 +51,7 @@ ! scratch - logical, optional If true, allocate without checking/zeroing contents. ! default: .false. ! -subroutine psb_zasb_vect(x, desc_a, info, mold, scratch) +subroutine psb_zasb_vect(x, desc_a, info, mold, dupl,scratch) use psb_base_mod, psb_protect_name => psb_zasb_vect implicit none @@ -59,12 +59,13 @@ subroutine psb_zasb_vect(x, desc_a, info, mold, scratch) type(psb_z_vect_type), intent(inout) :: x integer(psb_ipk_), intent(out) :: info class(psb_z_base_vect_type), intent(in), optional :: mold + integer(psb_ipk_), optional, intent(in) :: dupl logical, intent(in), optional :: scratch ! local variables type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np,me - integer(psb_ipk_) :: i1sz,nrow,ncol, err_act + integer(psb_ipk_) :: i1sz,nrow,ncol, err_act, dupl_ logical :: scratch_ integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name,ch_err @@ -82,6 +83,11 @@ subroutine psb_zasb_vect(x, desc_a, info, mold, scratch) scratch_ = .false. if (present(scratch)) scratch_ = scratch + if (present(dupl)) then + dupl_ = dupl + else + dupl_ = psb_dupl_ovwrt_ + endif call psb_info(ctxt, me, np) ! ....verify blacs grid correctness.. @@ -104,6 +110,7 @@ subroutine psb_zasb_vect(x, desc_a, info, mold, scratch) call x%free(info) call x%bld(ncol,mold=mold) else + if (x%is_remote_build()) call psb_z_remote_vect(x,desc_a,dupl_, info) call x%asb(ncol,info) ! ..update halo elements.. call psb_halo(x,desc_a,info) diff --git a/base/tools/psb_zins.f90 b/base/tools/psb_zins.f90 index 19020379..089a1da7 100644 --- a/base/tools/psb_zins.f90 +++ b/base/tools/psb_zins.f90 @@ -112,7 +112,6 @@ subroutine psb_zins_vect(m, irw, val, x, desc_a, info, dupl,local) endif - allocate(irl(m),stat=info) if (info /= psb_success_) then info = psb_err_alloc_dealloc_ diff --git a/base/tools/psb_zspalloc.f90 b/base/tools/psb_zspalloc.f90 index 3b0fc7d2..741b3fb7 100644 --- a/base/tools/psb_zspalloc.f90 +++ b/base/tools/psb_zspalloc.f90 @@ -116,22 +116,28 @@ subroutine psb_zspalloc(a, desc_a, info, nnz, bldmode) !!$ write(0,*) name,'Setting a%remote_build ',& !!$ & bldmode_,psb_matbld_noremote_,psb_matbld_remote_ - a%remote_build = bldmode_ - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - !write(0,*) name,' matbld_noremote_ nothing needed' - case (psb_matbld_remote_) - !write(0,*) name,' matbld_remote_ start ' + call a%set_remote_build(bldmode_) + if (a%is_remote_build()) then allocate(a%rmta) nnzrmt_ = max(100,(nnz_/100)) call a%rmta%allocate(m,n,nnzrmt_) - - case default - write(0,*) name,'Invalid value for remote_build ' - a%remote_build = psb_matbld_noremote_ - end select + end if + +!!$ a%remote_build = bldmode_ +!!$ select case(a%remote_build) +!!$ case (psb_matbld_noremote_) +!!$ ! nothing needed +!!$ !write(0,*) name,' matbld_noremote_ nothing needed' +!!$ case (psb_matbld_remote_) +!!$ !write(0,*) name,' matbld_remote_ start ' +!!$ allocate(a%rmta) +!!$ nnzrmt_ = max(100,(nnz_/100)) +!!$ call a%rmta%allocate(m,n,nnzrmt_) +!!$ +!!$ case default +!!$ write(0,*) name,'Invalid value for remote_build ' +!!$ a%remote_build = psb_matbld_noremote_ +!!$ end select if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': ', & diff --git a/base/tools/psb_zspasb.f90 b/base/tools/psb_zspasb.f90 index 9c42ae85..a7fc5019 100644 --- a/base/tools/psb_zspasb.f90 +++ b/base/tools/psb_zspasb.f90 @@ -106,11 +106,7 @@ subroutine psb_zspasb(a,desc_a, info, afmt, upd, dupl, mold) ! ! First case: we come from a fresh build. ! - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! nothing needed - case (psb_matbld_remote_) + if (a%is_remote_build()) then !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() block type(psb_lz_coo_sparse_mat) :: a_add @@ -143,11 +139,42 @@ subroutine psb_zspasb(a,desc_a, info, afmt, upd, dupl, mold) if (nzt > 0) call psb_cdasb(desc_a,info,mold=ivm) end block - end select - call a%set_ncols(desc_a%get_local_cols()) - + end if + call a%set_ncols(desc_a%get_local_cols()) call a%cscnv(info,type=afmt,dupl=dupl, mold=mold) else if (a%is_upd()) then + if (a%is_remote_build()) then + !write(0,*) me,name,' Size of rmta:',a%rmta%get_nzeros() + block + type(psb_lz_coo_sparse_mat) :: a_add + integer(psb_ipk_), allocatable :: ila(:), jla(:) + integer(psb_ipk_) :: nz, nzt,k + call psb_remote_mat(a%rmta,desc_a,a_add,info) + nz = a_add%get_nzeros() +!!$ write(0,*) me,name,' Nz to be added',nz + nzt = nz + call psb_sum(ctxt,nzt) + if (nzt>0) then + allocate(ivm, mold=desc_a%v_halo_index%v) + call psb_cd_reinit(desc_a, info) + end if + if (nz > 0) then + ! + ! Should we check for new indices here? + ! + call psb_realloc(nz,ila,info) + call psb_realloc(nz,jla,info) + call desc_a%indxmap%g2l(a_add%ia(1:nz),ila(1:nz),info,owned=.true.) + if (info == 0) call desc_a%indxmap%g2l_ins(a_add%ja(1:nz),jla(1:nz),info) + !write(0,*) me,name,' Check before insert',a%get_nzeros() + n_row = desc_a%get_local_rows() + n_col = desc_a%get_local_cols() + call a%set_ncols(desc_a%get_local_cols()) + call a%csput(nz,ila,jla,a_add%val,ione,n_row,ione,n_col,info) + !write(0,*) me,name,' Check after insert',a%get_nzeros(),nz + end if + end block + end if call a%asb(mold=mold) else info = psb_err_invalid_mat_state_ diff --git a/base/tools/psb_zspins.F90 b/base/tools/psb_zspins.F90 index 835a4d04..abe64251 100644 --- a/base/tools/psb_zspins.F90 +++ b/base/tools/psb_zspins.F90 @@ -151,11 +151,8 @@ subroutine psb_zspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -173,9 +170,7 @@ subroutine psb_zspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select + end if else info = psb_err_invalid_a_and_cd_state_ @@ -208,10 +203,7 @@ subroutine psb_zspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call psb_errpush(info,name,a_err='a%csput') goto 9999 end if - select case(a%remote_build) - case (psb_matbld_noremote_) - ! Do nothing - case (psb_matbld_remote_) + if (a%is_remote_build()) then nnl = count(ila(1:nz)<0) if (nnl > 0) then !write(0,*) 'Check on insert ',nnl @@ -229,10 +221,7 @@ subroutine psb_zspins(nz,ia,ja,val,a,desc_a,info,rebuild,local) call a%rmta%csput(nnl,lila,ljla,lval,1_psb_lpk_,desc_a%get_global_rows(),& & 1_psb_lpk_,desc_a%get_global_rows(),info) end if - case default - write(0,*) name,' Ignoring wrong value for %remote_build' - end select - + end if else info = psb_err_invalid_cd_state_ call psb_errpush(info,name)