test/fileread/cf_sample.f90
 test/fileread/df_sample.f90
 test/fileread/sf_sample.f90
 test/fileread/zf_sample.f90
 util/Makefile
 util/psb_c_mat_dist_impl.f90
 util/psb_d_mat_dist_impl.f90
 util/psb_mat_dist_mod.f90
 util/psb_s_mat_dist_impl.f90
 util/psb_z_mat_dist_impl.f90

Reworking matdist.
psblas-3.4-maint
Salvatore Filippone 9 years ago
parent 926830cab0
commit 0f5b146522

@ -187,7 +187,7 @@ program cf_sample
ivg(i) = ipv(1)
enddo
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else if (ipart == 2) then
if (iam == psb_root_) then
@ -201,12 +201,12 @@ program cf_sample
call distr_mtpart(psb_root_,ictxt)
call getv_mtpart(ivg)
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else
if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")')
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block)
end if
call psb_geall(x_col,desc_a,info)

@ -194,7 +194,7 @@ program df_sample
ivg(i) = ipv(1)
enddo
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else if (ipart == 2) then
if (iam == psb_root_) then
@ -208,12 +208,12 @@ program df_sample
call distr_mtpart(psb_root_,ictxt)
call getv_mtpart(ivg)
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else
if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")')
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block)
end if
call psb_geall(x_col,desc_a,info)

@ -188,7 +188,7 @@ program sf_sample
ivg(i) = ipv(1)
enddo
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else if (ipart == 2) then
if (iam == psb_root_) then
@ -202,12 +202,12 @@ program sf_sample
call distr_mtpart(psb_root_,ictxt)
call getv_mtpart(ivg)
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else
if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")')
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block)
end if
call psb_geall(x_col,desc_a,info)

@ -187,7 +187,7 @@ program zf_sample
ivg(i) = ipv(1)
enddo
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else if (ipart == 2) then
if (iam == psb_root_) then
@ -201,12 +201,12 @@ program zf_sample
call distr_mtpart(psb_root_,ictxt)
call getv_mtpart(ivg)
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg)
else
if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")')
call psb_matdist(aux_a, a, ictxt, &
& desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block)
& desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block)
end if
call psb_geall(x_col,desc_a,info)

@ -8,6 +8,7 @@ HERE=.
BASEOBJS= psb_blockpart_mod.o psb_metispart_mod.o \
psb_hbio_mod.o psb_mmio_mod.o psb_mat_dist_mod.o \
psb_s_mat_dist_mod.o psb_d_mat_dist_mod.o psb_c_mat_dist_mod.o psb_z_mat_dist_mod.o \
psb_renum_mod.o psb_gps_mod.o psb_d_genpde_mod.o psb_s_genpde_mod.o
IMPLOBJS= psb_s_hbio_impl.o psb_d_hbio_impl.o \
psb_c_hbio_impl.o psb_z_hbio_impl.o \
@ -37,6 +38,7 @@ $(HERE)/$(LIBNAME): $(OBJS)
$(OBJS): $(INCDIR)/$(BASEMODNAME)$(.mod)
psb_util_mod.o: $(BASEOBJS)
psb_metispart_mod.o: metis_int.o
psb_mat_dist_mod.o: psb_s_mat_dist_mod.o psb_d_mat_dist_mod.o psb_c_mat_dist_mod.o psb_z_mat_dist_mod.o
$(IMPLOBJS): $(BASEOBJS)

@ -29,25 +29,17 @@
!!$ POSSIBILITY OF SUCH DAMAGE.
!!$
!!$
subroutine cmatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
& info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! type(psb_cspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! type(psb_cspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
@ -68,19 +60,21 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
! complex(psb_spk_), optional :: b_glob(:)
! on entry: RHS
!
! type(psb_c_vect_type), optional :: b
! on exit : this will contain the local right hand side.
!
! complex(psb_spk_), optional :: x_glob(:)
! on entry: initial guess
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! type(psb_c_vect_type), optional :: x
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
@ -93,34 +87,36 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
! parameters
type(psb_cspmat_type) :: a_glob
complex(psb_spk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_cspmat_type) :: a
type(psb_c_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=5), optional :: fmt
complex(psb_spk_), optional :: b_glob(:)
type(psb_c_vect_type), optional :: b
complex(psb_spk_), optional :: x_glob(:)
type(psb_c_vect_type), optional :: x
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_ipk_) :: np, iam, length_row
integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: i_count, j_count,&
& k_count, root, liwork, nrow, ncol, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5)
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
complex(psb_spk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
complex(psb_spk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'mat_distf'
name = 'psb_c_mat_dist'
call psb_erractionsave(err_act)
! executable statements
@ -150,6 +146,18 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
if ( count((/present(b_glob),present(b)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" b_glob, b")
goto 9999
endif
if ( count((/present(x_glob),present(x)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" x_glob, x")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
@ -184,19 +192,10 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np))
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psspall'
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geall(b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psdsall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
@ -212,24 +211,31 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, length_row)
if (length_row == 1) then
j_count = i_count
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwork, length_row)
if (length_row /= 1 ) exit
call parts(j_count,nrow,np,iwork, np_sharing)
if (np_sharing /= 1 ) exit
if (iwork(1) /= iproc ) exit
end do
end if
else
length_row = 1
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
@ -238,25 +244,27 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
end do
end if
if (length_row == 1) then
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
ll = ll + nz
end do
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
@ -266,25 +274,19 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_ins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
else if (iam /= root) then
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
@ -303,7 +305,6 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
@ -312,93 +313,10 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),&
& b_glob(i_count:i_count+nnr-1),b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
i_count = j_count
else
! here processors are counted 1..np
do j_count = 1, length_row
k_count = iwork(j_count)
if (iam == root) then
ll = 0
do i= i_count, i_count
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
if (k_count == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,ll,k_count)
call psb_snd(ictxt,irow(1:ll),k_count)
call psb_snd(ictxt,icol(1:ll),k_count)
call psb_snd(ictxt,val(1:ll),k_count)
call psb_snd(ictxt,b_glob(i_count),k_count)
call psb_rcv(ictxt,ll,k_count)
endif
else if (iam /= root) then
if (k_count == iam) then
call psb_rcv(ictxt,ll,root)
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
end do
i_count = i_count + 1
endif
i_count = j_count
end do
call psb_barrier(ictxt)
@ -429,13 +347,8 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
call psb_geasb(b,desc_a,info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psdsasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
deallocate(val,irow,icol,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
@ -444,6 +357,27 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
goto 9999
end if
if (present(b_glob).and.present(b)) then
call psb_scatter(b_glob,b,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
if (present(x_glob).and.present(x)) then
call psb_scatter(x_glob,x,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
deallocate(iwork)
if (iam == root) write (*, fmt = *) 'end matdist'
@ -454,4 +388,4 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine cmatdist
end subroutine psb_cmatdist

@ -29,25 +29,17 @@
!!$ POSSIBILITY OF SUCH DAMAGE.
!!$
!!$
subroutine dmatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
& info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! type(psb_dspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! type(psb_dspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
@ -68,19 +60,21 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
! on entry: RHS
!
! type(psb_d_vect_type), optional :: b
! on exit : this will contain the local right hand side.
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! real(psb_dpk_), optional :: x_glob(:)
! on entry: initial guess
!
! type(psb_d_vect_type), optional :: x
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
@ -93,34 +87,36 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
! parameters
type(psb_dspmat_type) :: a_glob
real(psb_dpk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_dspmat_type) :: a
type(psb_d_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=5), optional :: fmt
real(psb_dpk_), optional :: b_glob(:)
type(psb_d_vect_type), optional :: b
real(psb_dpk_), optional :: x_glob(:)
type(psb_d_vect_type), optional :: x
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_ipk_) :: np, iam, length_row
integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: i_count, j_count,&
& k_count, root, liwork, nrow, ncol, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5)
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
real(psb_dpk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
real(psb_dpk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'mat_distf'
name = 'psb_d_mat_dist'
call psb_erractionsave(err_act)
! executable statements
@ -150,6 +146,18 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
if ( count((/present(b_glob),present(b)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" b_glob, b")
goto 9999
endif
if ( count((/present(x_glob),present(x)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" x_glob, x")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
@ -166,7 +174,7 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
endif
if (iam == root) then
write (*, fmt = *) 'start matdist',root, size(iwork),&
&nrow, ncol, nnzero,nrhs, use_parts, use_v
&nrow, ncol, nnzero,nrhs
endif
if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
@ -175,7 +183,6 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
else
info = -1
end if
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_cdall'
@ -185,19 +192,10 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np))
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psspall'
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geall(b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psdsall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
@ -213,24 +211,31 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, length_row)
if (length_row == 1) then
j_count = i_count
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwork, length_row)
if (length_row /= 1 ) exit
call parts(j_count,nrow,np,iwork, np_sharing)
if (np_sharing /= 1 ) exit
if (iwork(1) /= iproc ) exit
end do
end if
else
length_row = 1
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
@ -239,26 +244,28 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
end do
end if
if (length_row == 1) then
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
ll = ll + nz
end do
!!$ write(0,*) 'mat_dist: sending rows ',i_count,j_count-1,' to proc',iproc, ll
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
@ -267,25 +274,19 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_ins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
else if (iam /= root) then
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
@ -304,7 +305,6 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
@ -313,96 +313,12 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),&
& b_glob(i_count:i_count+nnr-1),b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
i_count = j_count
else
! here processors are counted 1..np
do j_count = 1, length_row
k_count = iwork(j_count)
if (iam == root) then
ll = 0
do i= i_count, i_count
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
if (k_count == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,ll,k_count)
call psb_snd(ictxt,irow(1:ll),k_count)
call psb_snd(ictxt,icol(1:ll),k_count)
call psb_snd(ictxt,val(1:ll),k_count)
call psb_snd(ictxt,b_glob(i_count),k_count)
call psb_rcv(ictxt,ll,k_count)
endif
else if (iam /= root) then
if (k_count == iam) then
call psb_rcv(ictxt,ll,root)
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
end do
i_count = i_count + 1
endif
i_count = j_count
end do
call psb_barrier(ictxt)
t0 = psb_wtime()
call psb_cdasb(desc_a,info)
@ -431,13 +347,8 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
call psb_geasb(b,desc_a,info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psdsasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
deallocate(val,irow,icol,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
@ -446,6 +357,27 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
goto 9999
end if
if (present(b_glob).and.present(b)) then
call psb_scatter(b_glob,b,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
if (present(x_glob).and.present(x)) then
call psb_scatter(x_glob,x,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
deallocate(iwork)
if (iam == root) write (*, fmt = *) 'end matdist'
@ -456,4 +388,4 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine dmatdist
end subroutine psb_dmatdist

@ -30,321 +30,10 @@
!!$
!!$
module psb_mat_dist_mod
use psb_base_mod, only : psb_ipk_, psb_spk_, psb_dpk_, psb_desc_type, psb_parts, &
& psb_sspmat_type, psb_cspmat_type, psb_dspmat_type, psb_zspmat_type, &
& psb_s_base_sparse_mat, psb_c_base_sparse_mat, &
& psb_d_base_sparse_mat, psb_z_base_sparse_mat, &
& psb_s_vect_type, psb_d_vect_type, psb_c_vect_type, psb_z_vect_type
interface psb_matdist
subroutine smatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_sspmat_type, psb_desc_type, psb_spk_,&
& psb_s_base_sparse_mat, psb_s_vect_type, psb_parts
implicit none
! parameters
type(psb_sspmat_type) :: a_glob
real(psb_spk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_sspmat_type) :: a
type(psb_s_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine smatdist
subroutine dmatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_dspmat_type, psb_dpk_, psb_desc_type,&
& psb_d_base_sparse_mat, psb_d_vect_type, psb_parts
implicit none
! parameters
type(psb_dspmat_type) :: a_glob
real(psb_dpk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_dspmat_type) :: a
type(psb_d_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine dmatdist
subroutine cmatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_cspmat_type, psb_spk_, psb_desc_type,&
& psb_c_base_sparse_mat, psb_c_vect_type, psb_parts
implicit none
! parameters
type(psb_cspmat_type) :: a_glob
complex(psb_spk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_cspmat_type) :: a
type(psb_c_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine cmatdist
subroutine zmatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_zspmat_type, psb_dpk_, psb_desc_type,&
& psb_z_base_sparse_mat, psb_z_vect_type, psb_parts
implicit none
! parameters
type(psb_zspmat_type) :: a_glob
complex(psb_dpk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_zspmat_type) :: a
type(psb_z_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine zmatdist
end interface
use psb_s_mat_dist_mod
use psb_d_mat_dist_mod
use psb_c_mat_dist_mod
use psb_z_mat_dist_mod
end module psb_mat_dist_mod

@ -29,25 +29,17 @@
!!$ POSSIBILITY OF SUCH DAMAGE.
!!$
!!$
subroutine smatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
& info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! type(psb_sspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! type(psb_sspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
@ -68,19 +60,21 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
! real(psb_spk_), optional :: b_glob(:)
! on entry: RHS
!
! type(psb_s_vect_type), optional :: b
! on exit : this will contain the local right hand side.
!
! real(psb_spk_), optional :: x_glob(:)
! on entry: initial guess
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! type(psb_s_vect_type), optional :: x
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
@ -93,34 +87,36 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
! parameters
type(psb_sspmat_type) :: a_glob
real(psb_spk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_sspmat_type) :: a
type(psb_s_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=5), optional :: fmt
real(psb_spk_), optional :: b_glob(:)
type(psb_s_vect_type), optional :: b
real(psb_spk_), optional :: x_glob(:)
type(psb_s_vect_type), optional :: x
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_ipk_) :: np, iam, length_row
integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: i_count, j_count,&
& k_count, root, liwork, nrow, ncol, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5)
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
real(psb_spk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
real(psb_spk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'mat_distf'
name = 'psb_s_mat_dist'
call psb_erractionsave(err_act)
! executable statements
@ -150,6 +146,18 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
if ( count((/present(b_glob),present(b)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" b_glob, b")
goto 9999
endif
if ( count((/present(x_glob),present(x)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" x_glob, x")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
@ -184,19 +192,10 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np))
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psspall'
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geall(b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psdsall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
@ -212,24 +211,31 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, length_row)
if (length_row == 1) then
j_count = i_count
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwork, length_row)
if (length_row /= 1 ) exit
call parts(j_count,nrow,np,iwork, np_sharing)
if (np_sharing /= 1 ) exit
if (iwork(1) /= iproc ) exit
end do
end if
else
length_row = 1
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
@ -238,25 +244,27 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
end do
end if
if (length_row == 1) then
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
ll = ll + nz
end do
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
@ -266,25 +274,19 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_ins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
else if (iam /= root) then
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
@ -303,7 +305,6 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
@ -312,93 +313,10 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),&
& b_glob(i_count:i_count+nnr-1),b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
i_count = j_count
else
! here processors are counted 1..np
do j_count = 1, length_row
k_count = iwork(j_count)
if (iam == root) then
ll = 0
do i= i_count, i_count
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
if (k_count == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,ll,k_count)
call psb_snd(ictxt,irow(1:ll),k_count)
call psb_snd(ictxt,icol(1:ll),k_count)
call psb_snd(ictxt,val(1:ll),k_count)
call psb_snd(ictxt,b_glob(i_count),k_count)
call psb_rcv(ictxt,ll,k_count)
endif
else if (iam /= root) then
if (k_count == iam) then
call psb_rcv(ictxt,ll,root)
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
end do
i_count = i_count + 1
endif
i_count = j_count
end do
call psb_barrier(ictxt)
@ -429,13 +347,8 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
call psb_geasb(b,desc_a,info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psdsasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
deallocate(val,irow,icol,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
@ -444,6 +357,27 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
goto 9999
end if
if (present(b_glob).and.present(b)) then
call psb_scatter(b_glob,b,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
if (present(x_glob).and.present(x)) then
call psb_scatter(x_glob,x,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
deallocate(iwork)
if (iam == root) write (*, fmt = *) 'end matdist'
@ -454,4 +388,4 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine smatdist
end subroutine psb_smatdist

@ -29,25 +29,17 @@
!!$ POSSIBILITY OF SUCH DAMAGE.
!!$
!!$
subroutine zmatdist(a_glob, a, ictxt, desc_a,&
& b_glob, b, info, parts, v, inroot,fmt,mold)
subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
& info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(d_spmat) :: a_glob
! type(psb_zspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
! a%fida == 'csr'
! a%aspk for coefficient values
! a%ia1 for column indices
! a%ia2 for row pointers
! a%m for number of global matrix rows
! a%k for number of global matrix columns
! on exit : undefined, with unassociated pointers.
!
! type(d_spmat) :: a
! on entry: fresh variable.
! type(psb_zspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
@ -68,19 +60,21 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: blacs context.
! on exit : unchanged.
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on entry: fresh variable.
! on exit : the updated array descriptor
!
! real(psb_dpk_), optional :: b_glob(:)
! on entry: this contains right hand side.
! on exit :
! complex(psb_dpk_), optional :: b_glob(:)
! on entry: RHS
!
! type(psb_z_vect_type), optional :: b
! on exit : this will contain the local right hand side.
!
! complex(psb_dpk_), optional :: x_glob(:)
! on entry: initial guess
!
! real(psb_dpk_), allocatable, optional :: b(:)
! on entry: fresh variable.
! type(psb_z_vect_type), optional :: x
! on exit : this will contain the local right hand side.
!
! integer(psb_ipk_), optional :: inroot
@ -93,34 +87,36 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
! parameters
type(psb_zspmat_type) :: a_glob
complex(psb_dpk_) :: b_glob(:)
integer(psb_ipk_) :: ictxt
type(psb_zspmat_type) :: a
type(psb_z_vect_type) :: b
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=5), optional :: fmt
complex(psb_dpk_), optional :: b_glob(:)
type(psb_z_vect_type), optional :: b
complex(psb_dpk_), optional :: x_glob(:)
type(psb_z_vect_type), optional :: x
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_ipk_) :: np, iam, length_row
integer(psb_ipk_) :: np, iam, np_sharing
integer(psb_ipk_) :: i_count, j_count,&
& k_count, root, liwork, nrow, ncol, nnzero, nrhs,&
& i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5)
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
complex(psb_dpk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
integer(psb_ipk_), allocatable :: iwork(:)
integer(psb_ipk_), allocatable :: irow(:),icol(:)
complex(psb_dpk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'mat_distf'
name = 'psb_z_mat_dist'
call psb_erractionsave(err_act)
! executable statements
@ -150,6 +146,18 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
if ( count((/present(b_glob),present(b)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" b_glob, b")
goto 9999
endif
if ( count((/present(x_glob),present(x)/)) == 1 ) then
info=psb_err_optional_arg_pair_
call psb_errpush(info,name,a_err=" x_glob, x")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
@ -184,19 +192,10 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np))
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psspall'
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geall(b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_psdsall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
@ -212,24 +211,31 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, length_row)
if (length_row == 1) then
j_count = i_count
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwork, length_row)
if (length_row /= 1 ) exit
call parts(j_count,nrow,np,iwork, np_sharing)
if (np_sharing /= 1 ) exit
if (iwork(1) /= iproc ) exit
end do
end if
else
length_row = 1
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
@ -238,25 +244,27 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
end do
end if
if (length_row == 1) then
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
ll = ll + nz
end do
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
@ -266,25 +274,19 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_ins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
else if (iam /= root) then
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
@ -303,7 +305,6 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
@ -312,93 +313,10 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),&
& b_glob(i_count:i_count+nnr-1),b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
i_count = j_count
else
! here processors are counted 1..np
do j_count = 1, length_row
k_count = iwork(j_count)
if (iam == root) then
ll = 0
do i= i_count, i_count
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
if (k_count == iam) then
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,ll,k_count)
call psb_snd(ictxt,irow(1:ll),k_count)
call psb_snd(ictxt,icol(1:ll),k_count)
call psb_snd(ictxt,val(1:ll),k_count)
call psb_snd(ictxt,b_glob(i_count),k_count)
call psb_rcv(ictxt,ll,k_count)
endif
else if (iam /= root) then
if (k_count == iam) then
call psb_rcv(ictxt,ll,root)
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_rcv(ictxt,b_glob(i_count),root)
call psb_snd(ictxt,ll,root)
call psb_spins(ll,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),&
& b,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psdsins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
endif
end do
i_count = i_count + 1
endif
i_count = j_count
end do
call psb_barrier(ictxt)
@ -429,13 +347,8 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
call psb_geasb(b,desc_a,info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psdsasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
deallocate(val,irow,icol,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
@ -444,6 +357,27 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
goto 9999
end if
if (present(b_glob).and.present(b)) then
call psb_scatter(b_glob,b,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
if (present(x_glob).and.present(x)) then
call psb_scatter(x_glob,x,desc_a,info,root=root)
if (info /= 0) then
info=psb_err_from_subroutine_
ch_err='psb_scatter'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
end if
deallocate(iwork)
if (iam == root) write (*, fmt = *) 'end matdist'
@ -454,4 +388,4 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine zmatdist
end subroutine psb_zmatdist

Loading…
Cancel
Save