diff --git a/base/comm/psb_dscatter_a.F90 b/base/comm/psb_dscatter_a.F90 index 0f3be5aa..8e10d28b 100644 --- a/base/comm/psb_dscatter_a.F90 +++ b/base/comm/psb_dscatter_a.F90 @@ -63,7 +63,7 @@ subroutine psb_dscatterm(globx, locx, desc_a, info, root) ! locals type(psb_ctxt_type) :: ctxt - integer(psb_mpk_) :: np, me, iroot, icomm, myrank, rootrank, iam, nlr + integer(psb_mpk_) :: np, iam, iroot, icomm, myrank, rootrank, nlr integer(psb_ipk_) :: ierr(5), err_act, nrow,& & ilocx, jlocx, lda_locx, lda_globx, lock, globk, k, maxk, & & col,pos @@ -102,29 +102,24 @@ subroutine psb_dscatterm(globx, locx, desc_a, info, root) iroot = psb_root_ end if + icomm = psb_get_mpi_comm(ctxt) + myrank = psb_get_mpi_rank(ctxt,iam) + iglobx = 1 jglobx = 1 - lda_globx = size(globx,1) - m = desc_a%get_global_rows() - n = desc_a%get_global_cols() - icomm = psb_get_mpi_comm(ctxt) - myrank = psb_get_mpi_rank(ctxt,me) - - if (iroot==-1) then - lda_globx = size(globx, 1) + ! Get col number K and broadcast to other processes + if ((iroot==-1).or.(iam==iroot)) then + lda_globx = size(globx,1) k = size(globx,2) + call psb_bcast(ctxt,k,root=iroot) else - if (iam==iroot) then - k = size(globx,2) - lda_globx = size(globx, 1) - end if + call psb_bcast(ctxt,k,root=iroot) end if - + m = desc_a%get_global_rows() n = desc_a%get_global_cols() - ! there should be a global check on k here!!! if ((iroot==-1).or.(iam==iroot)) & & call psb_chkglobvect(m,n,lda_globx,iglobx,jglobx,desc_a,info) @@ -211,7 +206,7 @@ subroutine psb_dscatterm(globx, locx, desc_a, info, root) ! scatter call mpi_scatterv(scatterv,all_dim,displ,& - & psb_mpi_r_dpk_,locx(1,col),nrow,& + & psb_mpi_r_dpk_,locx(:,col),nrow,& & psb_mpi_r_dpk_,rootrank,icomm,info) end do diff --git a/base/modules/psblas/psb_d_psblas_mod.F90 b/base/modules/psblas/psb_d_psblas_mod.F90 index 6ea1d499..b2739d7a 100644 --- a/base/modules/psblas/psb_d_psblas_mod.F90 +++ b/base/modules/psblas/psb_d_psblas_mod.F90 @@ -45,27 +45,25 @@ module psb_d_psblas_mod integer(psb_ipk_), intent(out) :: info logical, intent(in), optional :: global end function psb_ddot_vect - function psb_ddot_multivect(x, y, desc_a,info,t,global) result(res) + function psb_ddot_multivect_col_v(x, y, desc_a,info,global) result(res) import :: psb_desc_type, psb_dpk_, psb_ipk_, & & psb_d_multivect_type, psb_dspmat_type real(psb_dpk_), allocatable :: res(:,:) type(psb_d_multivect_type), intent(inout) :: x, y - logical, optional, intent(in) :: t type(psb_desc_type), intent(in) :: desc_a integer(psb_ipk_), intent(out) :: info logical, intent(in), optional :: global - end function psb_ddot_multivect - function psb_ddot_multivect_1(x, y, desc_a,info,t,global) result(res) + end function psb_ddot_multivect_col_v + function psb_ddot_multivect_row_a(x, y, desc_a,info,global) result(res) import :: psb_desc_type, psb_dpk_, psb_ipk_, & & psb_d_multivect_type, psb_dspmat_type real(psb_dpk_), allocatable :: res(:,:) type(psb_d_multivect_type), intent(inout) :: x real(psb_dpk_), intent(in) :: y(:,:) - logical, optional, intent(in) :: t type(psb_desc_type), intent(in) :: desc_a integer(psb_ipk_), intent(out) :: info logical, intent(in), optional :: global - end function psb_ddot_multivect_1 + end function psb_ddot_multivect_row_a function psb_ddotv(x, y, desc_a,info,global) import :: psb_desc_type, psb_dpk_, psb_ipk_, & & psb_d_vect_type, psb_dspmat_type @@ -129,7 +127,7 @@ module psb_d_psblas_mod type(psb_desc_type), intent (in) :: desc_a integer(psb_ipk_), intent(out) :: info end subroutine psb_daxpby_multivect - subroutine psb_daxpby_multivect_1(alpha, x, beta, y, desc_a, info) + subroutine psb_daxpby_multivect_a(alpha, x, beta, y, desc_a, info) import :: psb_desc_type, psb_dpk_, psb_ipk_, & & psb_d_multivect_type, psb_dspmat_type real(psb_dpk_), intent(in) :: x(:,:) @@ -137,7 +135,7 @@ module psb_d_psblas_mod real(psb_dpk_), intent (in) :: alpha, beta type(psb_desc_type), intent (in) :: desc_a integer(psb_ipk_), intent(out) :: info - end subroutine psb_daxpby_multivect_1 + end subroutine psb_daxpby_multivect_a subroutine psb_daxpby_vect_out(alpha, x, beta, y,& & z, desc_a, info) import :: psb_desc_type, psb_dpk_, psb_ipk_, & diff --git a/base/modules/serial/psb_d_base_vect_mod.F90 b/base/modules/serial/psb_d_base_vect_mod.F90 index 707ad9ba..d2438aa2 100644 --- a/base/modules/serial/psb_d_base_vect_mod.F90 +++ b/base/modules/serial/psb_d_base_vect_mod.F90 @@ -2246,14 +2246,17 @@ module psb_d_base_multivect_mod procedure, pass(x) :: set_vect => d_base_mlv_set_vect generic, public :: set => set_vect, set_scal ! - ! Dot product and AXPBY + ! TODO Dot product (col-by-col and row-by-col) and AXPBY ! - procedure, pass(x) :: dot_v => d_base_mlv_dot_v - procedure, pass(x) :: dot_a => d_base_mlv_dot_a - generic, public :: dot => dot_v, dot_a - procedure, pass(y) :: axpby_v => d_base_mlv_axpby_v - procedure, pass(y) :: axpby_a => d_base_mlv_axpby_a - generic, public :: axpby => axpby_v, axpby_a + procedure, pass(x) :: dot_col_v => d_base_mlv_dot_col_v + procedure, pass(x) :: dot_col_a => d_base_mlv_dot_col_a + generic, public :: dot_col => dot_col_v, dot_col_a + procedure, pass(x) :: dot_row_v => d_base_mlv_dot_row_v + procedure, pass(x) :: dot_row_a => d_base_mlv_dot_row_a + generic, public :: dot_row => dot_row_v, dot_row_a + procedure, pass(y) :: axpby_v => d_base_mlv_axpby_v + procedure, pass(y) :: axpby_a => d_base_mlv_axpby_a + generic, public :: axpby => axpby_v, axpby_a ! ! MultiVector by vector/multivector multiplication. Need all variants ! to handle multiple requirements from preconditioners @@ -2358,14 +2361,22 @@ contains real(psb_dpk_), intent(in) :: this(:,:) class(psb_d_base_multivect_type), intent(inout) :: x integer(psb_ipk_) :: info + integer(psb_ipk_) :: i call psb_realloc(size(this,1),size(this,2),x%v,info) if (info /= 0) then call psb_errpush(psb_err_alloc_dealloc_,'base_mlv_vect_bld') return end if - x%v(:,:) = this(:,:) +#if defined (OPENMP) + !$omp parallel do private(i) + do i = 1, size(this,1) + x%v(i,:) = this(i,:) + end do +#else + x%v(:,:) = this(:,:) +#endif end subroutine d_base_mlv_bld_x ! @@ -2801,31 +2812,24 @@ contains end subroutine d_base_mlv_set_vect - ! - ! Dot products + ! TODO + ! Col Dot products ! ! - !> Function base_mlv_dot_v + !> Function base_mlv_dot_col_v !! \memberof psb_d_base_multivect_type - !! \brief Dot product by another base_mlv_vector + !! \brief Col-by-col mult using dot product by a mlv + !! \param nr Number of rows to be considered !! \param y The other (base_mlv_vect) to be multiplied by - !! \param res Result matrix - !! \param t If true, x is transposed + !! \param res Result vector !! - subroutine d_base_mlv_dot_v(x,y,res,t) + function d_base_mlv_dot_col_v(nr,x,y) result(res) implicit none class(psb_d_base_multivect_type), intent(inout) :: x, y - real(psb_dpk_), intent(inout) :: res(:,:) - logical, optional, intent(in) :: t - logical :: t_ - external :: dgemm - integer(psb_ipk_) :: x_m, x_n, y_m, y_n - - if (present(t)) then - t_ = t - else - t_ = .false. - end if + integer(psb_ipk_), intent(in) :: nr + real(psb_dpk_), allocatable :: res(:,:) + real(psb_dpk_), external :: ddot + integer(psb_ipk_) :: i, j, n_x, n_y if (x%is_dev()) call x%sync() ! @@ -2835,65 +2839,126 @@ contains ! If Y is not, throw the burden on it, implicitly ! calling dot_a ! - x_m = x%get_nrows() - x_n = x%get_ncols() - y_m = y%get_nrows() - y_n = y%get_ncols() select type(yy => y) type is (psb_d_base_multivect_type) if (y%is_dev()) call y%sync() - if (t_) then - call dgemm('T','N',x_n,y_n,x_n,done,x%v,x_n,y%v,y_m,dzero,res,x_n) - else - call dgemm('N','N',x_m,y_n,x_n,done,x%v,x_m,y%v,y_m,dzero,res,x_m) - end if + n_x = psb_size(x%v,2_psb_ipk_) + n_y = psb_size(y%v,2_psb_ipk_) + allocate(res(n_x,n_y)) + do i=1,n_x + do j=1,n_y + res(i,j) = ddot(nr,x%v(1:nr,i),1,y%v(1:nr,j),1) + end do + end do class default - call x%dot(y%v,res,t) + res = x%dot_col(nr,y%v) end select - end subroutine d_base_mlv_dot_v + end function d_base_mlv_dot_col_v ! ! Base workhorse is good old BLAS1 ! ! - !> Function base_mlv_dot_a + !> Function base_mlv_dot_col_a !! \memberof psb_d_base_multivect_type - !! \brief Dot product by a normal array - !! \param n Number of entries to be considered + !! \brief Col-by-col mult using dot product by a normal array + !! \param nr Number of rows to be considered !! \param y(:,:) The array to be multiplied by - !! \param res Result matrix - !! \param t If true, x is transposed + !! \param res Result vector !! - subroutine d_base_mlv_dot_a(x,y,res,t) + function d_base_mlv_dot_col_a(nr,x,y) result(res) class(psb_d_base_multivect_type), intent(inout) :: x real(psb_dpk_), intent(in) :: y(:,:) - real(psb_dpk_), intent(inout) :: res(:,:) - logical, optional, intent(in) :: t - logical :: t_ - external :: dgemm - integer(psb_ipk_) :: x_m, x_n, y_m, y_n - - if (present(t)) then - t_ = t - else - t_ = .false. - end if + real(psb_dpk_), allocatable :: res(:,:) + real(psb_dpk_), external :: ddot + integer(psb_ipk_) :: i, j, n_x, n_y if (x%is_dev()) call x%sync() + n_x = psb_size(x%v,2_psb_ipk_) + n_y = size(y,2_psb_ipk_) + allocate(res(n_x,n_y)) + do i=1,n_x + do j=1,n_y + res(i,j) = ddot(nr,x%v(1:nr,i),1,y(1:nr,j),1) + end do + end do - x_m = x%get_nrows() - x_n = x%get_ncols() - y_m = size(y,dim=1) - y_n = size(y,dim=2) + end function d_base_mlv_dot_col_a - if (t_) then - call dgemm('T','N',x_n,y_n,x_n,done,x%v,x_n,y,y_m,dzero,res,x_n) - else - call dgemm('N','N',x_m,y_n,x_n,done,x%v,x_m,y,y_m,dzero,res,x_m) - end if + ! + ! Row Dot products + ! + ! + !> Function base_mlv_dot_col_v + !! \memberof psb_d_base_multivect_type + !! \brief Row-by-col mult using dot product by mlv + !! \param nr Number of rows to be considered + !! \param y The other (base_mlv_vect) to be multiplied by + !! \param res Result vector + !! + function d_base_mlv_dot_row_v(nr,x,y) result(res) + implicit none + class(psb_d_base_multivect_type), intent(inout) :: x, y + integer(psb_ipk_), intent(in) :: nr + real(psb_dpk_), allocatable :: res(:,:) + real(psb_dpk_), external :: ddot + integer(psb_ipk_) :: i, j, n_x, n_y + + if (x%is_dev()) call x%sync() + ! + ! Note: this is the base implementation. + ! When we get here, we are sure that X is of + ! TYPE psb_d_base_mlv_vect (or its class does not care). + ! If Y is not, throw the burden on it, implicitly + ! calling dot_a + ! + select type(yy => y) + type is (psb_d_base_multivect_type) + if (y%is_dev()) call y%sync() + n_x = psb_size(x%v,2_psb_ipk_) + n_y = psb_size(y%v,2_psb_ipk_) + allocate(res(nr,n_y)) + do i=1,nr + do j=1,n_y + res(i,j) = ddot(n_x,x%v(i,:),1,y%v(:,j),1) + end do + end do + class default + res = x%dot_row(nr,y%v) + end select + + end function d_base_mlv_dot_row_v + + ! + ! Base workhorse is good old BLAS1 + ! + ! + !> Function base_mlv_dot_row_a + !! \memberof psb_d_base_multivect_type + !! \brief Row-by-col mult using dot product by a normal array + !! \param nr Number of rows to be considered + !! \param y(:,:) The array to be multiplied by + !! \param res Result vector + !! + function d_base_mlv_dot_row_a(nr,x,y) result(res) + class(psb_d_base_multivect_type), intent(inout) :: x + real(psb_dpk_), intent(in) :: y(:,:) + real(psb_dpk_), allocatable :: res(:,:) + real(psb_dpk_), external :: ddot + integer(psb_ipk_) :: i, j, n_x, n_y - end subroutine d_base_mlv_dot_a + if (x%is_dev()) call x%sync() + n_x = psb_size(x%v,2_psb_ipk_) + n_y = size(y,2_psb_ipk_) + allocate(res(psb_size(x%v,1_psb_ipk_),n_y)) + do i=1,nr + do j=1,n_y + res(i,j) = ddot(n_x,x%v(i,:),1,y(:,j),1) + end do + end do + + end function d_base_mlv_dot_row_a ! ! AXPBY is invoked via Y, hence the structure below. @@ -2928,7 +2993,7 @@ contains select type(xx => x) type is (psb_d_base_multivect_type) call psb_geaxpby(m,nc,alpha,x%v,beta,y%v,info) - class default + class default call y%axpby(m,alpha,x%v,beta,info,n=n) end select @@ -3215,17 +3280,17 @@ contains !> Function base_mlv_nrm2 !! \memberof psb_d_base_multivect_type !! \brief 2-norm |x(1:n)|_2 - !! \param nc how many entries to consider - function d_base_mlv_nrm2(nc,x) result(res) + !! \param nr how many rows to consider + function d_base_mlv_nrm2(nr,x) result(res) implicit none class(psb_d_base_multivect_type), intent(inout) :: x - integer(psb_ipk_), intent(in) :: nc + integer(psb_ipk_), intent(in) :: nr real(psb_dpk_), allocatable :: res real(psb_dpk_), external :: dnrm2 - integer(psb_ipk_) :: j, nr + integer(psb_ipk_) :: j, nc if (x%is_dev()) call x%sync() - nr = x%get_nrows() + nc = x%get_ncols() res = dnrm2(nc*nr,x%v,1) end function d_base_mlv_nrm2 @@ -3234,16 +3299,16 @@ contains !> Function base_mlv_amax !! \memberof psb_d_base_multivect_type !! \brief infinity-norm |x(1:n)|_\infty - !! \param nc how many entries to consider - function d_base_mlv_amax(nc,x) result(res) + !! \param nr how many rows to consider + function d_base_mlv_amax(nr,x) result(res) implicit none class(psb_d_base_multivect_type), intent(inout) :: x - integer(psb_ipk_), intent(in) :: nc + integer(psb_ipk_), intent(in) :: nr real(psb_dpk_) :: res - integer(psb_ipk_) :: i, nr + integer(psb_ipk_) :: i, nc if (x%is_dev()) call x%sync() - nr = x%get_nrows() + nc = x%get_ncols() res = 0 do i=1,nr res = max(res,sum(abs(x%v(i,1:nc)))) @@ -3255,16 +3320,16 @@ contains !> Function base_mlv_asum !! \memberof psb_d_base_multivect_type !! \brief 1-norm |x(1:n)|_1 - !! \param nc how many entries to consider - function d_base_mlv_asum(nc,x) result(res) + !! \param nr how many rows to consider + function d_base_mlv_asum(nr,x) result(res) implicit none class(psb_d_base_multivect_type), intent(inout) :: x - integer(psb_ipk_), intent(in) :: nc + integer(psb_ipk_), intent(in) :: nr real(psb_dpk_), allocatable :: res - integer(psb_ipk_) :: j, nr + integer(psb_ipk_) :: j, nc if (x%is_dev()) call x%sync() - nr = x%get_nrows() + nc = x%get_ncols() res = 0 do j=1,nc res = max(res,sum(abs(x%v(1:nr,j)))) @@ -3314,10 +3379,10 @@ contains !! \param info Return code !! \brief QR factorization ! - subroutine d_base_mlv_qr_fact(x, res, info) + function d_base_mlv_qr_fact(x, info) result(res) implicit none class(psb_d_base_multivect_type), intent(inout) :: x - real(psb_dpk_), intent(inout) :: res(:,:) + real(psb_dpk_), allocatable :: res(:,:) real(psb_dpk_), allocatable :: tau(:), work(:) integer(psb_ipk_) :: i, j, m, n, lda, lwork integer(psb_ipk_), intent(out) :: info @@ -3330,7 +3395,7 @@ contains n = x%get_ncols() lda = m lwork = n - allocate(tau(n), work(lwork)) + allocate(tau(n), work(lwork), res(n,n)) ! Perform QR factorization call dgeqrf(m, n, x%v, lda, tau, work, lwork, info) @@ -3348,7 +3413,7 @@ contains deallocate(tau, work) - end subroutine d_base_mlv_qr_fact + end function d_base_mlv_qr_fact function d_base_mlv_use_buffer() result(res) implicit none diff --git a/base/modules/serial/psb_d_vect_mod.F90 b/base/modules/serial/psb_d_vect_mod.F90 index 4647c2e9..d432f8f3 100644 --- a/base/modules/serial/psb_d_vect_mod.F90 +++ b/base/modules/serial/psb_d_vect_mod.F90 @@ -1389,12 +1389,15 @@ module psb_d_multivect_mod ! ! Dot product and AXPBY ! - procedure, pass(x) :: dot_v => d_vect_dot_v - procedure, pass(x) :: dot_a => d_vect_dot_a - generic, public :: dot => dot_v, dot_a - procedure, pass(y) :: axpby_v => d_vect_axpby_v - procedure, pass(y) :: axpby_a => d_vect_axpby_a - generic, public :: axpby => axpby_v, axpby_a + procedure, pass(x) :: dot_col_v => d_vect_dot_col_v + procedure, pass(x) :: dot_col_a => d_vect_dot_col_a + generic, public :: dot_col => dot_col_v, dot_col_a + procedure, pass(x) :: dot_row_v => d_vect_dot_row_v + procedure, pass(x) :: dot_row_a => d_vect_dot_row_a + generic, public :: dot_row => dot_row_v, dot_row_a + procedure, pass(y) :: axpby_v => d_vect_axpby_v + procedure, pass(y) :: axpby_a => d_vect_axpby_a + generic, public :: axpby => axpby_v, axpby_a ! ! MultiVector by vector/multivector multiplication. Need all variants ! to handle multiple requirements from preconditioners @@ -1548,9 +1551,11 @@ contains class(psb_d_multivect_type), intent(out) :: x class(psb_d_base_multivect_type), intent(in), optional :: mold integer(psb_ipk_) :: info - class(psb_d_base_multivect_type), pointer :: mld info = psb_success_ + if (allocated(x%v)) & + & call x%free(info) + if (present(mold)) then allocate(x%v,stat=info,mold=mold) else @@ -1837,41 +1842,67 @@ contains class(psb_d_base_multivect_type), allocatable :: tmp integer(psb_ipk_) :: info + info = psb_success_ if (present(mold)) then allocate(tmp,stat=info,mold=mold) else - allocate(tmp,stat=info, mold=psb_d_get_base_multivect_default()) + allocate(tmp,stat=info,mold=psb_d_get_base_multivect_default()) endif if (allocated(x%v)) then - call x%v%sync() - if (info == psb_success_) call tmp%bld(x%v%v) - call x%v%free(info) + if (allocated(x%v%v)) then + call x%v%sync() + if (info == psb_success_) call tmp%bld(x%v%v) + call x%v%free(info) + endif end if call move_alloc(tmp,x%v) end subroutine d_vect_cnv - subroutine d_vect_dot_v(x,y,res,t) + function d_vect_dot_col_v(nr,x,y) result(res) + implicit none + class(psb_d_multivect_type), intent(inout) :: x, y + integer(psb_ipk_), intent(in) :: nr + real(psb_dpk_), allocatable :: res(:,:) + + if (allocated(x%v).and.allocated(y%v)) & + & res = x%v%dot_col(nr,y%v) + + end function d_vect_dot_col_v + + function d_vect_dot_col_a(nr,x,y) result(res) + implicit none + class(psb_d_multivect_type), intent(inout) :: x + real(psb_dpk_), intent(in) :: y(:,:) + integer(psb_ipk_), intent(in) :: nr + real(psb_dpk_), allocatable :: res(:,:) + + if (allocated(x%v)) & + & res = x%v%dot_col(nr,y) + + end function d_vect_dot_col_a + + function d_vect_dot_row_v(nr,x,y) result(res) implicit none class(psb_d_multivect_type), intent(inout) :: x, y - real(psb_dpk_), intent(inout) :: res(:,:) - logical, optional, intent(in) :: t + integer(psb_ipk_), intent(in) :: nr + real(psb_dpk_), allocatable :: res(:,:) if (allocated(x%v).and.allocated(y%v)) & - & call x%v%dot(y%v,res,t) + & res = x%v%dot_row(nr,y%v) - end subroutine d_vect_dot_v + end function d_vect_dot_row_v - subroutine d_vect_dot_a(x,y,res,t) + function d_vect_dot_row_a(nr,x,y) result(res) implicit none class(psb_d_multivect_type), intent(inout) :: x real(psb_dpk_), intent(in) :: y(:,:) - real(psb_dpk_), intent(inout) :: res(:,:) - logical, optional, intent(in) :: t + integer(psb_ipk_), intent(in) :: nr + real(psb_dpk_), allocatable :: res(:,:) if (allocated(x%v)) & - & call x%v%dot(y,res,t) + & res = x%v%dot_row(nr,y) - end subroutine d_vect_dot_a + end function d_vect_dot_row_a subroutine d_vect_axpby_v(m,alpha, x, beta, y, info) use psi_serial_mod @@ -2014,57 +2045,57 @@ contains !!$ !!$ - function d_vect_nrm2(nc,x) result(res) + function d_vect_nrm2(nr,x) result(res) implicit none class(psb_d_multivect_type), intent(inout) :: x - integer(psb_ipk_), intent(in) :: nc + integer(psb_ipk_), intent(in) :: nr real(psb_dpk_), allocatable :: res if (allocated(x%v)) then - res = x%v%nrm2(nc) + res = x%v%nrm2(nr) else res = dzero end if end function d_vect_nrm2 - function d_vect_amax(nc,x) result(res) + function d_vect_amax(nr,x) result(res) implicit none class(psb_d_multivect_type), intent(inout) :: x - integer(psb_ipk_), intent(in) :: nc + integer(psb_ipk_), intent(in) :: nr real(psb_dpk_) :: res if (allocated(x%v)) then - res = x%v%amax(nc) + res = x%v%amax(nr) else res = dzero end if end function d_vect_amax - function d_vect_asum(nc,x) result(res) + function d_vect_asum(nr,x) result(res) implicit none class(psb_d_multivect_type), intent(inout) :: x - integer(psb_ipk_), intent(in) :: nc + integer(psb_ipk_), intent(in) :: nr real(psb_dpk_) :: res if (allocated(x%v)) then - res = x%v%asum(nc) + res = x%v%asum(nr) else res = dzero end if end function d_vect_asum - subroutine d_vect_qr_fact(x, res, info) + function d_vect_qr_fact(x, info) result(res) implicit none class(psb_d_multivect_type), intent(inout) :: x - real(psb_dpk_), intent(inout) :: res(:,:) + real(psb_dpk_), allocatable :: res(:,:) integer(psb_ipk_), intent(out) :: info if (allocated(x%v)) then - call x%v%qr_fact(res, info) + res = x%v%qr_fact(info) endif - end subroutine d_vect_qr_fact + end function d_vect_qr_fact end module psb_d_multivect_mod diff --git a/base/psblas/psb_damax.f90 b/base/psblas/psb_damax.f90 index a36cad94..d29d5971 100644 --- a/base/psblas/psb_damax.f90 +++ b/base/psblas/psb_damax.f90 @@ -417,10 +417,9 @@ function psb_damax_multivect(x, desc_a, info, global) result(res) ix = 1 jx = 1 - m = x%get_nrows() - n = x%get_ncols() + m = desc_a%get_global_rows() - call psb_chkvect(m,n,x%get_nrows(),ix,jx,desc_a,info,iix,jjx) + call psb_chkvect(m,x%get_ncols(),x%get_nrows(),ix,jx,desc_a,info,iix,jjx) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect' @@ -436,7 +435,7 @@ function psb_damax_multivect(x, desc_a, info, global) result(res) ! compute local max if ((desc_a%get_local_rows() > 0).and.(m /= 0)) then - res = x%amax(x%get_ncols()) + res = x%amax(desc_a%get_local_rows()) else res = dzero end if diff --git a/base/psblas/psb_daxpby.f90 b/base/psblas/psb_daxpby.f90 index 9ef7d5b2..66f0f9b2 100644 --- a/base/psblas/psb_daxpby.f90 +++ b/base/psblas/psb_daxpby.f90 @@ -159,7 +159,7 @@ subroutine psb_daxpby_multivect(alpha, x, beta, y, desc_a, info) ! locals type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np, me, err_act, iix, jjx, iiy, jjy - integer(psb_lpk_) :: ix, ijx, iy, ijy, x_m, x_n, y_m, y_n + integer(psb_lpk_) :: ix, ijx, iy, ijy, m character(len=20) :: name, ch_err name='psb_dgeaxpby' @@ -192,21 +192,17 @@ subroutine psb_daxpby_multivect(alpha, x, beta, y, desc_a, info) iy = ione ijy = ione - x_m = x%get_nrows() - x_n = x%get_ncols() - - y_m = y%get_nrows() - y_n = y%get_ncols() + m = desc_a%get_global_rows() ! check vector correctness - call psb_chkvect(x_m,x_n,x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) + call psb_chkvect(m,x%get_ncols(),x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect 1' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_chkvect(y_m,y_n,y%get_nrows(),iy,ijy,desc_a,info,iiy,jjy) + call psb_chkvect(m,y%get_ncols(),y%get_nrows(),iy,ijy,desc_a,info,iiy,jjy) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect 2' @@ -249,8 +245,8 @@ end subroutine psb_daxpby_multivect ! Note: from a functional point of view, X is input, but here ! it's declared INOUT because of the sync() methods. ! -subroutine psb_daxpby_multivect_1(alpha, x, beta, y, desc_a, info) - use psb_base_mod, psb_protect_name => psb_daxpby_multivect_1 +subroutine psb_daxpby_multivect_a(alpha, x, beta, y, desc_a, info) + use psb_base_mod, psb_protect_name => psb_daxpby_multivect_a implicit none real(psb_dpk_), intent(in) :: x(:,:) type(psb_d_multivect_type), intent (inout) :: y @@ -261,7 +257,7 @@ subroutine psb_daxpby_multivect_1(alpha, x, beta, y, desc_a, info) ! locals type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np, me, err_act, iiy, jjy - integer(psb_lpk_) :: iy, ijy, y_m, y_n + integer(psb_lpk_) :: iy, ijy, m character(len=20) :: name, ch_err name='psb_dgeaxpby' @@ -286,10 +282,9 @@ subroutine psb_daxpby_multivect_1(alpha, x, beta, y, desc_a, info) iy = ione ijy = ione - y_m = y%get_nrows() - y_n = y%get_ncols() + m = desc_a%get_global_rows() - call psb_chkvect(y_m,y_n,y%get_nrows(),iy,ijy,desc_a,info,iiy,jjy) + call psb_chkvect(m,y%get_ncols(),y%get_nrows(),iy,ijy,desc_a,info,iiy,jjy) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect 2' @@ -313,7 +308,7 @@ subroutine psb_daxpby_multivect_1(alpha, x, beta, y, desc_a, info) return -end subroutine psb_daxpby_multivect_1 +end subroutine psb_daxpby_multivect_a ! ! Parallel Sparse BLAS version 3.5 diff --git a/base/psblas/psb_ddot.f90 b/base/psblas/psb_ddot.f90 index 586a1935..decb9d9b 100644 --- a/base/psblas/psb_ddot.f90 +++ b/base/psblas/psb_ddot.f90 @@ -158,8 +158,8 @@ function psb_ddot_vect(x, y, desc_a,info,global) result(res) end function psb_ddot_vect ! -! Function: psb_ddot_multivect -! psb_ddot computes the dot product of two distributed vectors, +! Function: psb_ddot_multivect_col_v +! psb_ddot computes the col-by-col dot product of two distributed vectors, ! ! dot := ( X )**C * ( Y ) ! @@ -169,35 +169,33 @@ end function psb_ddot_vect ! y - type(psb_d_multivect_type) The input vector containing the entries of sub( Y ). ! desc_a - type(psb_desc_type). The communication descriptor. ! info - integer. Return code -! t - logical(optional) Whether x is transposed, default: .false. ! global - logical(optional) Whether to perform the global sum, default: .true. ! ! Note: from a functional point of view, X and Y are input, but here ! they are declared INOUT because of the sync() methods. ! ! -function psb_ddot_multivect(x, y, desc_a,info,t,global) result(res) +function psb_ddot_multivect_col_v(x, y, desc_a,info,global) result(res) use psb_desc_mod use psb_d_base_mat_mod use psb_check_mod use psb_error_mod use psb_penv_mod use psb_d_vect_mod - use psb_d_psblas_mod, psb_protect_name => psb_ddot_multivect + use psb_d_psblas_mod, psb_protect_name => psb_ddot_multivect_col_v implicit none real(psb_dpk_), allocatable :: res(:,:) type(psb_d_multivect_type), intent(inout) :: x, y type(psb_desc_type), intent(in) :: desc_a integer(psb_ipk_), intent(out) :: info - logical, optional, intent(in) :: t logical, intent(in), optional :: global ! locals type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np, me, idx, ndm,& - & err_act, iix, jjx, iiy, jjy, i, nr - integer(psb_lpk_) :: ix, ijx, iy, ijy, x_m, x_n, y_m, y_n - logical :: global_, t_ + & err_act, iix, jjx, iiy, jjy, i, j, k, nr + integer(psb_lpk_) :: ix, ijx, iy, ijy, m + logical :: global_ character(len=20) :: name, ch_err name='psb_ddot_multivect' @@ -225,12 +223,6 @@ function psb_ddot_multivect(x, y, desc_a,info,t,global) result(res) goto 9999 endif - if (present(t)) then - t_ = t - else - t_ = .false. - end if - if (present(global)) then global_ = global else @@ -243,16 +235,13 @@ function psb_ddot_multivect(x, y, desc_a,info,t,global) result(res) iy = ione ijy = ione - x_m = x%get_nrows() - x_n = x%get_ncols() - - y_m = y%get_nrows() - y_n = y%get_ncols() + ! TODO + m = desc_a%get_global_rows() ! check vector correctness - call psb_chkvect(x_m,x_n,x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) + call psb_chkvect(m,x%get_ncols(),x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) if (info == psb_success_) & - & call psb_chkvect(y_m,y_n,y%get_nrows(),iy,ijy,desc_a,info,iiy,jjy) + & call psb_chkvect(m,y%get_ncols(),y%get_nrows(),iy,ijy,desc_a,info,iiy,jjy) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect' @@ -269,28 +258,26 @@ function psb_ddot_multivect(x, y, desc_a,info,t,global) result(res) nr = desc_a%get_local_rows() if(nr > 0) then - if (t_) then - allocate(res(x_n,y_n)) - else - allocate(res(x_m,y_n)) - endif + res = x%dot_col(nr,y) - call x%dot(y,res,t_) ! TODO adjust dot_local because overlapped elements are computed more than once - ! if (size(desc_a%ovrlap_elem,1)>0) then - ! if (x%v%is_dev()) call x%sync() - ! if (y%v%is_dev()) call y%sync() - ! do i=1,size(desc_a%ovrlap_elem,1) - ! idx = desc_a%ovrlap_elem(i,1) - ! ndm = desc_a%ovrlap_elem(i,2) - ! res = res - (real(ndm-1)/real(ndm))*(x%v%v(idx)*y%v%v(idx)) - ! end do - ! end if + if (size(desc_a%ovrlap_elem,1)>0) then + if (x%v%is_dev()) call x%sync() + if (y%v%is_dev()) call y%sync() + do j=1,x%get_ncols() + do i=1,size(desc_a%ovrlap_elem,1) + idx = desc_a%ovrlap_elem(i,1) + ndm = desc_a%ovrlap_elem(i,2) + res(idx,j) = res(idx,j) - (real(ndm-1)/real(ndm))*(x%v%v(idx,j)*y%v%v(idx,j)) + end do + end do + end if else + allocate(res(size(x%v%v,2_psb_ipk_),size(y%v%v,2_psb_ipk_))) res = dzero end if - ! compute global sum + ! TODO compute global sum if (global_) call psb_sum(ctxt, res) call psb_erractionrestore(err_act) @@ -300,10 +287,10 @@ function psb_ddot_multivect(x, y, desc_a,info,t,global) result(res) return -end function psb_ddot_multivect +end function psb_ddot_multivect_col_v ! -! Function: psb_ddot_multivect_1 -! psb_ddot computes the dot product of two distributed vectors, +! Function: psb_ddot_multivect_row_a +! psb_ddot computes the row-by-col dot product of two distributed vectors, ! ! dot := ( X )**C * ( Y ) ! @@ -313,36 +300,34 @@ end function psb_ddot_multivect ! y - real(psb_dpk_)(:,:) The input vector containing the entries of sub( Y ). ! desc_a - type(psb_desc_type). The communication descriptor. ! info - integer. Return code -! t - logical(optional) Whether x is transposed, default: .false. ! global - logical(optional) Whether to perform the global sum, default: .true. ! ! Note: from a functional point of view, X and Y are input, but here ! they are declared INOUT because of the sync() methods. ! ! -function psb_ddot_multivect_1(x, y, desc_a,info,t,global) result(res) +function psb_ddot_multivect_row_a(x, y, desc_a, info, global) result(res) use psb_desc_mod use psb_d_base_mat_mod use psb_check_mod use psb_error_mod use psb_penv_mod use psb_d_vect_mod - use psb_d_psblas_mod, psb_protect_name => psb_ddot_multivect_1 + use psb_d_psblas_mod, psb_protect_name => psb_ddot_multivect_row_a implicit none real(psb_dpk_), allocatable :: res(:,:) type(psb_d_multivect_type), intent(inout) :: x real(psb_dpk_), intent(in) :: y(:,:) type(psb_desc_type), intent(in) :: desc_a integer(psb_ipk_), intent(out) :: info - logical, optional, intent(in) :: t logical, intent(in), optional :: global ! locals type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np, me, idx, ndm,& - & err_act, iix, jjx, i, nr - integer(psb_lpk_) :: ix, ijx, iy, ijy, x_m, x_n, y_n - logical :: global_, t_ + & err_act, iix, jjx, i, j, nr + integer(psb_lpk_) :: ix, ijx, iy, ijy, m + logical :: global_ character(len=20) :: name, ch_err name='psb_ddot_multivect' @@ -365,12 +350,6 @@ function psb_ddot_multivect_1(x, y, desc_a,info,t,global) result(res) goto 9999 endif - if (present(t)) then - t_ = t - else - t_ = .false. - end if - if (present(global)) then global_ = global else @@ -380,13 +359,10 @@ function psb_ddot_multivect_1(x, y, desc_a,info,t,global) result(res) ix = ione ijx = ione - x_m = x%get_nrows() - x_n = x%get_ncols() - - y_n = size(y,dim=2) + m = desc_a%get_global_rows() ! check vector correctness - call psb_chkvect(x_m,x_n,x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) + call psb_chkvect(m,x%get_ncols(),x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect' @@ -403,28 +379,25 @@ function psb_ddot_multivect_1(x, y, desc_a,info,t,global) result(res) nr = desc_a%get_local_rows() if(nr > 0) then - if (t_) then - allocate(res(x_n,y_n)) - else - allocate(res(x_m,y_n)) - endif + res = x%dot_row(nr,y) - call x%dot(y,res,t_) ! TODO adjust dot_local because overlapped elements are computed more than once - ! if (size(desc_a%ovrlap_elem,1)>0) then - ! if (x%v%is_dev()) call x%sync() - ! if (y%v%is_dev()) call y%sync() - ! do i=1,size(desc_a%ovrlap_elem,1) - ! idx = desc_a%ovrlap_elem(i,1) - ! ndm = desc_a%ovrlap_elem(i,2) - ! res = res - (real(ndm-1)/real(ndm))*(x%v%v(idx)*y%v%v(idx)) - ! end do - ! end if + if (size(desc_a%ovrlap_elem,1)>0) then + if (x%v%is_dev()) call x%sync() + do j=1,x%get_ncols() + do i=1,size(desc_a%ovrlap_elem,1) + idx = desc_a%ovrlap_elem(i,1) + ndm = desc_a%ovrlap_elem(i,2) + res(idx,j) = res(idx,j) - (real(ndm-1)/real(ndm))*(x%v%v(idx,j)*y(idx,j)) + end do + end do + end if else + allocate(res(nr,size(y,2_psb_ipk_))) res = dzero end if - ! compute global sum + ! TODO compute global sum if (global_) call psb_sum(ctxt, res) call psb_erractionrestore(err_act) @@ -434,7 +407,7 @@ function psb_ddot_multivect_1(x, y, desc_a,info,t,global) result(res) return -end function psb_ddot_multivect_1 +end function psb_ddot_multivect_row_a ! ! Function: psb_ddot ! psb_ddot computes the dot product of two distributed vectors, diff --git a/base/psblas/psb_dnrm2.f90 b/base/psblas/psb_dnrm2.f90 index a3a9c129..76c9c4a2 100644 --- a/base/psblas/psb_dnrm2.f90 +++ b/base/psblas/psb_dnrm2.f90 @@ -400,7 +400,8 @@ function psb_dnrm2_multivect(x, desc_a, info,global) result(res) ! locals type(psb_ctxt_type) :: ctxt - integer(psb_ipk_) :: np, me, err_act, iix, jjx + integer(psb_ipk_) :: np, me, err_act, idx, i, j, iix, jjx, ldx, ndm + real(psb_dpk_) :: dd integer(psb_lpk_) :: ix, jx, m, n logical :: global_ character(len=20) :: name, ch_err @@ -436,10 +437,11 @@ function psb_dnrm2_multivect(x, desc_a, info,global) result(res) ix = 1 jx = 1 - m = x%get_nrows() + m = desc_a%get_global_rows() n = x%get_ncols() + ldx = x%get_nrows() - call psb_chkvect(m,n,x%get_nrows(),ix,jx,desc_a,info,iix,jjx) + call psb_chkvect(m,n,ldx,ix,jx,desc_a,info,iix,jjx) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect' @@ -453,21 +455,25 @@ function psb_dnrm2_multivect(x, desc_a, info,global) result(res) end if if (desc_a%get_local_rows() > 0) then - res = x%nrm2(x%get_ncols()) + res = x%nrm2(desc_a%get_local_rows()) + ! TODO adjust because overlapped elements are computed more than once - ! if (size(desc_a%ovrlap_elem,1)>0) then - ! if (x%is_dev()) call x%sync() - ! do i=1,size(desc_a%ovrlap_elem,1) - ! idx = desc_a%ovrlap_elem(i,1) - ! ndm = desc_a%ovrlap_elem(i,2) - ! dd = dble(ndm-1)/dble(ndm) - ! res = res * sqrt(done - dd*(abs(x%v%v(idx))/res)**2) - ! end do - ! end if + if (size(desc_a%ovrlap_elem,1)>0) then + if (x%v%is_dev()) call x%sync() + do j=1,x%get_ncols() + do i=1,size(desc_a%ovrlap_elem,1) + idx = desc_a%ovrlap_elem(i,1) + ndm = desc_a%ovrlap_elem(i,2) + dd = dble(ndm-1)/dble(ndm) + res = res * sqrt(done - dd*(abs(x%v%v(idx,j))/res)**2) + end do + end do + end if else res = dzero end if + ! TODO if (global_) call psb_nrm2(ctxt,res) call psb_erractionrestore(err_act) diff --git a/base/psblas/psb_dqrfact.f90 b/base/psblas/psb_dqrfact.f90 index 721f9d4e..188898a3 100644 --- a/base/psblas/psb_dqrfact.f90 +++ b/base/psblas/psb_dqrfact.f90 @@ -21,7 +21,7 @@ function psb_dqrfact(x, desc_a, info) result(res) ! locals type(psb_ctxt_type) :: ctxt integer(psb_ipk_) :: np, me, err_act, iix, jjx - integer(psb_lpk_) :: ix, ijx, x_m, x_n + integer(psb_lpk_) :: ix, ijx, m character(len=20) :: name, ch_err name='psb_dgqrfact' @@ -46,10 +46,9 @@ function psb_dqrfact(x, desc_a, info) result(res) ix = ione ijx = ione - x_m = x%get_nrows() - x_n = x%get_ncols() + m = desc_a%get_global_rows() - call psb_chkvect(x_m,x_n,x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) + call psb_chkvect(m,x%get_ncols(),x%get_nrows(),ix,ijx,desc_a,info,iix,jjx) if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_chkvect' @@ -62,9 +61,9 @@ function psb_dqrfact(x, desc_a, info) result(res) call psb_errpush(info,name) end if + ! TODO serial? if(desc_a%get_local_rows() > 0) then - allocate(res(x_n,x_n)) - call x%qr_fact(res, info) + res = x%qr_fact(info) end if call psb_erractionrestore(err_act) diff --git a/base/psblas/psb_dspmm.f90 b/base/psblas/psb_dspmm.f90 index a0365426..535f1c69 100644 --- a/base/psblas/psb_dspmm.f90 +++ b/base/psblas/psb_dspmm.f90 @@ -365,8 +365,8 @@ subroutine psb_dspmv_multivect(alpha, a, x, beta, y, desc_a, info, trans, work,d ncol = desc_a%get_local_cols() lldx = x%get_nrows() lldy = y%get_nrows() - if ((info == 0).and.(lldx 0).and.(me == psb_root_)) call log_header(methdname) @@ -232,6 +228,7 @@ subroutine psb_dbgmres_multivect(a, prec, b, x, eps, desc_a, info, itmax, iter, goto 9999 end if + ! TODO gather su root e poi scatter ! STEP 2: Compute QR_fact(R(0)) beta = psb_geqrfact(v(1),desc_a,info) if (info /= psb_success_) then @@ -243,10 +240,31 @@ subroutine psb_dbgmres_multivect(a, prec, b, x, eps, desc_a, info, itmax, iter, ! STEP 3: Outer loop outer: do j=1,nrep + ! TODO Check convergence + ! if (istop_ == 1) then + ! rni = psb_geamax(v(1),desc_a,info) + ! xni = psb_geamax(x,desc_a,info) + ! errnum = rni + ! errden = (ani*xni+bni) + ! else if (istop_ == 2) then + ! rni = psb_genrm2(v(1),desc_a,info) + ! errnum = rni + ! errden = bn2 + ! endif + ! if (info /= psb_success_) then + ! info=psb_err_from_subroutine_non_ + ! call psb_errpush(info,name) + ! goto 9999 + ! end if + + ! if (errnum <= eps*errden) exit outer + + ! if (itrace_ > 0) call log_conv(methdname,me,itx,itrace_,errnum,errden,deps) + itx = itx + 1 ! Compute j index for H operations - idx_j = (j-1)*n+1 + idx_j = (j-1)*nrhs+1 ! STEP 4: Compute W = AV(j) call psb_spmm(done,a,v(j),dzero,w,desc_a,info,work=aux) @@ -261,31 +279,44 @@ subroutine psb_dbgmres_multivect(a, prec, b, x, eps, desc_a, info, itmax, iter, ! STEP 5: Inner loop inner: do i=1,j + write(*,*) 'PROC ', me, ' LOOOP 1', i + ! Compute i index for H operations - idx_i = (i-1)*n+1 + idx_i = (i-1)*nrhs+1 ! STEP 6: Compute H(i,j) = V(i)_T*W - h(idx_i:idx_i+n_add,idx_j:idx_j+n_add) = psb_gedot(v(i),w,desc_a,info,.true.) + h(idx_i:idx_i+n_add,idx_j:idx_j+n_add) = psb_gedot(v(i),w,desc_a,info) if (info /= psb_success_) then info=psb_err_from_subroutine_non_ call psb_errpush(info,name) goto 9999 end if + write(*,*) 'PROC ', me, ' LOOOP 2', i + ! STEP 7: Compute W = W - V(i)*H(i,j) - call psb_geaxpby(-done,psb_gedot(v(i),h(idx_i:idx_i+n_add,idx_j:idx_j+n_add),desc_a,info),done,w,desc_a,info) + temp = psb_gedot(v(i),h(idx_i:idx_i+n_add,idx_j:idx_j+n_add),desc_a,info) + !call psb_geaxpby(-done,psb_gedot(v(i),h(idx_i:idx_i+n_add,idx_j:idx_j+n_add),desc_a,info),done,w,desc_a,info) + + write(*,*) 'PROC ', me, ' LOOOP 3', i + + call psb_geaxpby(-done,temp,done,w,desc_a,info) if (info /= psb_success_) then info=psb_err_from_subroutine_non_ call psb_errpush(info,name) goto 9999 end if + write(*,*) 'PROC ', me, ' LOOOP 4', i + end do inner ! STEP 8: Compute QR_fact(W) + !write(*,*) 'PROC ', me, ' BBBB ', j + ! Store R in H(j+1,j) - h(idx_j+n:idx_j+n+n_add,idx_j:idx_j+n_add) = psb_geqrfact(w,desc_a,info) + h(idx_j+nrhs:idx_j+nrhs+n_add,idx_j:idx_j+n_add) = psb_geqrfact(w,desc_a,info) if (info /= psb_success_) then info=psb_err_from_subroutine_non_ call psb_errpush(info,name) @@ -300,6 +331,8 @@ subroutine psb_dbgmres_multivect(a, prec, b, x, eps, desc_a, info, itmax, iter, goto 9999 end if + write(*,*) 'PROC ', me, ' AAAA' + end do outer ! STEP 9: Compute Y(m) @@ -310,10 +343,11 @@ subroutine psb_dbgmres_multivect(a, prec, b, x, eps, desc_a, info, itmax, iter, goto 9999 end if + ! TODO Va vene che ci siano altre righe perchè poi si passa localrows ! STEP 10: Compute V = {V(1),...,V(m)} do i=1,nrep+1 - idx = (i-1)*n+1 - v_tot%v%v(:,idx:idx+n_add) = v(i)%v%v(:,1:n) + idx = (i-1)*nrhs+1 + v_tot%v%v(1:n_row,idx:idx+n_add) = v(i)%v%v(1:n_row,1:nrhs) enddo ! STEP 11: X(m) = X(0) + V*Y(m) @@ -360,19 +394,19 @@ contains integer(psb_ipk_) :: m_h, n_h, mn ! Initialize params - m_h = (nrep+1)*n - n_h = nrep*n + m_h = (nrep+1)*nrhs + n_h = nrep*nrhs mn = min(m_h,n_h) - lwork = max(1,mn+max(mn,n)) + lwork = max(1,mn+max(mn,nrhs)) allocate(work(lwork)) ! Compute E1*beta - allocate(beta_e1(m_h,n)) + allocate(beta_e1(m_h,nrhs)) beta_e1 = dzero - beta_e1(1:n,1:n) = beta + beta_e1(1:nrhs,1:nrhs) = beta ! Compute min Frobenius norm - call dgels('N',m_h,n_h,n,h,m_h,beta_e1,m_h,work,lwork,info) + call dgels('N',m_h,n_h,nrhs,h,m_h,beta_e1,m_h,work,lwork,info) deallocate(work,beta_e1) diff --git a/test/block_krylov/getp.f90 b/test/block_krylov/getp.f90 index 344295d1..197cb646 100644 --- a/test/block_krylov/getp.f90 +++ b/test/block_krylov/getp.f90 @@ -128,12 +128,12 @@ contains write(psb_out_unit,'(" ")') write(psb_out_unit,'("Solving matrix : ",a)') mtrx_file - write(psb_out_unit,'("Number of processors : ",i1)') np + write(psb_out_unit,'("Number of processors : ",i3)') np write(psb_out_unit,'("Data distribution : ",a)') part write(psb_out_unit,'("Iterative method : ",a)') kmethd write(psb_out_unit,'("Preconditioner : ",a)') ptype - write(psb_out_unit,'("Number of RHS : ",i1)') nrhs - write(psb_out_unit,'("Number of iterations : ",i1)') itrs + write(psb_out_unit,'("Number of RHS : ",i3)') nrhs + write(psb_out_unit,'("Number of iterations : ",i3)') itrs write(psb_out_unit,'("Storage format : ",a)') afmt(1:3) write(psb_out_unit,'(" ")') else diff --git a/test/block_krylov/psb_dbf_sample.f90 b/test/block_krylov/psb_dbf_sample.f90 index 74216b3d..9b1b05ec 100644 --- a/test/block_krylov/psb_dbf_sample.f90 +++ b/test/block_krylov/psb_dbf_sample.f90 @@ -117,7 +117,7 @@ program psb_dbf_sample b_mv_glob =>aux_b(:,:) else write(psb_out_unit,'("Generating an rhs...")') - write(psb_out_unit,'("Number of RHS: ",i1)') nrhs + write(psb_out_unit,'("Number of RHS: ",i3)') nrhs write(psb_out_unit,'(" ")') call psb_realloc(m,nrhs,aux_b,ircode) if (ircode /= 0) then @@ -197,16 +197,27 @@ program psb_dbf_sample if(iam == psb_root_) then write(psb_out_unit,'("Preconditioner time: ",es12.5)')tprec write(psb_out_unit,'(" ")') + write(psb_out_unit,'("Starting algorithm")') + write(psb_out_unit,'(" ")') end if call psb_barrier(ctxt) t1 = psb_wtime() + call psb_krylov(kmethd,a,prec,b_mv,x_mv,eps,desc_a,info,& & itmax=itmax,iter=iter,err=err,itrace=itrace,& & itrs=itrs,istop=istopc) + call psb_barrier(ctxt) t2 = psb_wtime() - t1 call psb_amx(ctxt,t2) + + if(iam == psb_root_) then + write(psb_out_unit,'("Finished algorithm")') + write(psb_out_unit,'(" ")') + end if + + ! TODO spmm cambia X (che senso ha?) call psb_geaxpby(done,b_mv,dzero,r_mv,desc_a,info) call psb_spmm(-done,a,x_mv,done,r_mv,desc_a,info) @@ -221,6 +232,10 @@ program psb_dbf_sample call psb_sum(ctxt,descsize) call psb_sum(ctxt,precsize) + call psb_gather(x_mv_glob,x_mv,desc_a,info,root=psb_root_) + if (info == psb_success_) call psb_gather(r_mv_glob,r_mv,desc_a,info,root=psb_root_) + if (info /= psb_success_) goto 9999 + if (iam == psb_root_) then call prec%descr(info) write(psb_out_unit,'(" ")') @@ -240,12 +255,12 @@ program psb_dbf_sample write(psb_out_unit,'("Residual norm 2: ",es12.5)')resmx write(psb_out_unit,'("Residual norm inf: ",es12.5)')resmxp write(psb_out_unit,'(" ")') + ! TODO + ! do i=1,m + ! write(psb_out_unit,993) i, x_mv_glob(i,:), r_mv_glob(i,:), b_mv_glob(i,:) + ! enddo end if - call psb_gather(x_mv_glob,x_mv,desc_a,info,root=psb_root_) - if (info == psb_success_) call psb_gather(r_mv_glob,r_mv,desc_a,info,root=psb_root_) - if (info /= psb_success_) goto 9999 - 998 format(i8,4(2x,g20.14)) 993 format(i6,4(1x,e12.6))