diff --git a/test/fileread/cf_sample.f90 b/test/fileread/cf_sample.f90 index 317d1bd8..3694a763 100644 --- a/test/fileread/cf_sample.f90 +++ b/test/fileread/cf_sample.f90 @@ -187,7 +187,7 @@ program cf_sample ivg(i) = ipv(1) enddo call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (ipart == 2) then if (iam == psb_root_) then @@ -201,12 +201,12 @@ program cf_sample call distr_mtpart(psb_root_,ictxt) call getv_mtpart(ivg) call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")') call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block) end if call psb_geall(x_col,desc_a,info) diff --git a/test/fileread/df_sample.f90 b/test/fileread/df_sample.f90 index d39cbbeb..39ed1b88 100644 --- a/test/fileread/df_sample.f90 +++ b/test/fileread/df_sample.f90 @@ -194,7 +194,7 @@ program df_sample ivg(i) = ipv(1) enddo call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (ipart == 2) then if (iam == psb_root_) then @@ -208,12 +208,12 @@ program df_sample call distr_mtpart(psb_root_,ictxt) call getv_mtpart(ivg) call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")') call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block) end if call psb_geall(x_col,desc_a,info) diff --git a/test/fileread/sf_sample.f90 b/test/fileread/sf_sample.f90 index 6de05332..e35fba3b 100644 --- a/test/fileread/sf_sample.f90 +++ b/test/fileread/sf_sample.f90 @@ -188,7 +188,7 @@ program sf_sample ivg(i) = ipv(1) enddo call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (ipart == 2) then if (iam == psb_root_) then @@ -202,12 +202,12 @@ program sf_sample call distr_mtpart(psb_root_,ictxt) call getv_mtpart(ivg) call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")') call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block) end if call psb_geall(x_col,desc_a,info) diff --git a/test/fileread/zf_sample.f90 b/test/fileread/zf_sample.f90 index 166b753b..a1e39e44 100644 --- a/test/fileread/zf_sample.f90 +++ b/test/fileread/zf_sample.f90 @@ -187,7 +187,7 @@ program zf_sample ivg(i) = ipv(1) enddo call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (ipart == 2) then if (iam == psb_root_) then @@ -201,12 +201,12 @@ program zf_sample call distr_mtpart(psb_root_,ictxt) call getv_mtpart(ivg) call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,v=ivg) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,v=ivg) else if (iam == psb_root_) write(psb_out_unit,'("Partition type: block subroutine")') call psb_matdist(aux_a, a, ictxt, & - & desc_a,b_col_glob,b_col,info,fmt=afmt,parts=part_block) + & desc_a,info,b_glob=b_col_glob,b=b_col,fmt=afmt,parts=part_block) end if call psb_geall(x_col,desc_a,info) diff --git a/util/Makefile b/util/Makefile index a1ccc085..5d939109 100644 --- a/util/Makefile +++ b/util/Makefile @@ -8,6 +8,7 @@ HERE=. BASEOBJS= psb_blockpart_mod.o psb_metispart_mod.o \ psb_hbio_mod.o psb_mmio_mod.o psb_mat_dist_mod.o \ + psb_s_mat_dist_mod.o psb_d_mat_dist_mod.o psb_c_mat_dist_mod.o psb_z_mat_dist_mod.o \ psb_renum_mod.o psb_gps_mod.o psb_d_genpde_mod.o psb_s_genpde_mod.o IMPLOBJS= psb_s_hbio_impl.o psb_d_hbio_impl.o \ psb_c_hbio_impl.o psb_z_hbio_impl.o \ @@ -37,6 +38,7 @@ $(HERE)/$(LIBNAME): $(OBJS) $(OBJS): $(INCDIR)/$(BASEMODNAME)$(.mod) psb_util_mod.o: $(BASEOBJS) psb_metispart_mod.o: metis_int.o +psb_mat_dist_mod.o: psb_s_mat_dist_mod.o psb_d_mat_dist_mod.o psb_c_mat_dist_mod.o psb_z_mat_dist_mod.o $(IMPLOBJS): $(BASEOBJS) diff --git a/util/psb_c_mat_dist_impl.f90 b/util/psb_c_mat_dist_impl.f90 index 693a3c9c..8b6ba3d8 100644 --- a/util/psb_c_mat_dist_impl.f90 +++ b/util/psb_c_mat_dist_impl.f90 @@ -29,25 +29,17 @@ !!$ POSSIBILITY OF SUCH DAMAGE. !!$ !!$ -subroutine cmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) +subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& + & info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using ! sparse matrix subroutines. ! - ! type(d_spmat) :: a_glob + ! type(psb_cspmat) :: a_glob ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. ! - ! type(d_spmat) :: a - ! on entry: fresh variable. + ! type(psb_cspmat_type) :: a ! on exit : this will contain the local sparse matrix. ! ! interface parts @@ -68,19 +60,21 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& ! distribution. ! ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. + ! on entry: the PSBLAS parallel environment context. ! ! type (desc_type) :: desc_a - ! on entry: fresh variable. ! on exit : the updated array descriptor ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : + ! complex(psb_spk_), optional :: b_glob(:) + ! on entry: RHS + ! + ! type(psb_c_vect_type), optional :: b + ! on exit : this will contain the local right hand side. ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. + ! complex(psb_spk_), optional :: x_glob(:) + ! on entry: initial guess + ! + ! type(psb_c_vect_type), optional :: x ! on exit : this will contain the local right hand side. ! ! integer(psb_ipk_), optional :: inroot @@ -93,34 +87,36 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& ! parameters type(psb_cspmat_type) :: a_glob - complex(psb_spk_) :: b_glob(:) integer(psb_ipk_) :: ictxt type(psb_cspmat_type) :: a - type(psb_c_vect_type) :: b type(psb_desc_type) :: desc_a integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt + complex(psb_spk_), optional :: b_glob(:) + type(psb_c_vect_type), optional :: b + complex(psb_spk_), optional :: x_glob(:) + type(psb_c_vect_type), optional :: x + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt class(psb_c_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) ! local variables logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam, length_row + integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: i_count, j_count,& & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - complex(psb_spk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 + integer(psb_ipk_), allocatable :: iwork(:) + integer(psb_ipk_), allocatable :: irow(:),icol(:) + complex(psb_spk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 character(len=20) :: name, ch_err info = psb_success_ err = 0 - name = 'mat_distf' + name = 'psb_c_mat_dist' call psb_erractionsave(err_act) ! executable statements @@ -150,7 +146,19 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=" v, parts") goto 9999 endif + if ( count((/present(b_glob),present(b)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" b_glob, b") + goto 9999 + endif + if ( count((/present(x_glob),present(x)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" x_glob, x") + goto 9999 + endif + + ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -184,19 +192,10 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) if(info /= psb_success_) then info=psb_err_from_subroutine_ - ch_err='psb_psspall' + ch_err='psb_spall' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) allocate(val(isize),irow(isize),icol(isize),stat=info) @@ -212,24 +211,31 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& do while (i_count <= nrow) if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then iproc = iwork(1) do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit + call parts(j_count,nrow,np,iwork, np_sharing) + if (np_sharing /= 1 ) exit if (iwork(1) /= iproc ) exit end do end if else - length_row = 1 + np_sharing = 1 j_count = i_count iproc = v(i_count) - + iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit @@ -238,26 +244,28 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& end do end if - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' end if - ll = ll + nz - end do + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -266,25 +274,19 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if else call psb_snd(ictxt,nnr,iproc) call psb_snd(ictxt,ll,iproc) call psb_snd(ictxt,irow(1:ll),iproc) call psb_snd(ictxt,icol(1:ll),iproc) call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) call psb_rcv(ictxt,ll,iproc) endif - else if (iam /= root) then - + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) if (iproc == iam) then call psb_rcv(ictxt,nnr,root) call psb_rcv(ictxt,ll,root) @@ -298,12 +300,11 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - + endif call psb_rcv(ictxt,irow(1:ll),root) call psb_rcv(ictxt,icol(1:ll),root) call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) call psb_snd(ictxt,ll,root) call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -312,93 +313,10 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif endif end do - i_count = i_count + 1 endif + i_count = j_count end do call psb_barrier(ictxt) @@ -429,13 +347,8 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 end if - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if + + deallocate(val,irow,icol,stat=info) if(info /= psb_success_)then info=psb_err_from_subroutine_ @@ -444,6 +357,27 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& goto 9999 end if + if (present(b_glob).and.present(b)) then + call psb_scatter(b_glob,b,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + + + if (present(x_glob).and.present(x)) then + call psb_scatter(x_glob,x,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + deallocate(iwork) if (iam == root) write (*, fmt = *) 'end matdist' @@ -454,4 +388,4 @@ subroutine cmatdist(a_glob, a, ictxt, desc_a,& return -end subroutine cmatdist +end subroutine psb_cmatdist diff --git a/util/psb_d_mat_dist_impl.f90 b/util/psb_d_mat_dist_impl.f90 index 9e2296b9..719fd407 100644 --- a/util/psb_d_mat_dist_impl.f90 +++ b/util/psb_d_mat_dist_impl.f90 @@ -29,25 +29,17 @@ !!$ POSSIBILITY OF SUCH DAMAGE. !!$ !!$ -subroutine dmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) +subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& + & info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using ! sparse matrix subroutines. ! - ! type(d_spmat) :: a_glob + ! type(psb_dspmat) :: a_glob ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. ! - ! type(d_spmat) :: a - ! on entry: fresh variable. + ! type(psb_dspmat_type) :: a ! on exit : this will contain the local sparse matrix. ! ! interface parts @@ -68,19 +60,21 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& ! distribution. ! ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. + ! on entry: the PSBLAS parallel environment context. ! ! type (desc_type) :: desc_a - ! on entry: fresh variable. ! on exit : the updated array descriptor ! ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : + ! on entry: RHS + ! + ! type(psb_d_vect_type), optional :: b + ! on exit : this will contain the local right hand side. ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. + ! real(psb_dpk_), optional :: x_glob(:) + ! on entry: initial guess + ! + ! type(psb_d_vect_type), optional :: x ! on exit : this will contain the local right hand side. ! ! integer(psb_ipk_), optional :: inroot @@ -93,34 +87,36 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& ! parameters type(psb_dspmat_type) :: a_glob - real(psb_dpk_) :: b_glob(:) integer(psb_ipk_) :: ictxt type(psb_dspmat_type) :: a - type(psb_d_vect_type) :: b type(psb_desc_type) :: desc_a integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt + real(psb_dpk_), optional :: b_glob(:) + type(psb_d_vect_type), optional :: b + real(psb_dpk_), optional :: x_glob(:) + type(psb_d_vect_type), optional :: x + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt class(psb_d_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) ! local variables logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam, length_row + integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: i_count, j_count,& & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - real(psb_dpk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 + integer(psb_ipk_), allocatable :: iwork(:) + integer(psb_ipk_), allocatable :: irow(:),icol(:) + real(psb_dpk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 character(len=20) :: name, ch_err info = psb_success_ err = 0 - name = 'mat_distf' + name = 'psb_d_mat_dist' call psb_erractionsave(err_act) ! executable statements @@ -150,7 +146,19 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=" v, parts") goto 9999 endif + if ( count((/present(b_glob),present(b)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" b_glob, b") + goto 9999 + endif + if ( count((/present(x_glob),present(x)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" x_glob, x") + goto 9999 + endif + + ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -166,7 +174,7 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& endif if (iam == root) then write (*, fmt = *) 'start matdist',root, size(iwork),& - &nrow, ncol, nnzero,nrhs, use_parts, use_v + &nrow, ncol, nnzero,nrhs endif if (use_parts) then call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) @@ -175,7 +183,6 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& else info = -1 end if - if(info /= psb_success_) then info=psb_err_from_subroutine_ ch_err='psb_cdall' @@ -185,20 +192,11 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) if(info /= psb_success_) then info=psb_err_from_subroutine_ - ch_err='psb_psspall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' + ch_err='psb_spall' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) allocate(val(isize),irow(isize),icol(isize),stat=info) if(info /= psb_success_) then @@ -213,24 +211,31 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& do while (i_count <= nrow) if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then iproc = iwork(1) do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit + call parts(j_count,nrow,np,iwork, np_sharing) + if (np_sharing /= 1 ) exit if (iwork(1) /= iproc ) exit end do end if else - length_row = 1 + np_sharing = 1 j_count = i_count iproc = v(i_count) - + iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit @@ -239,26 +244,28 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& end do end if - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' end if - ll = ll + nz - end do -!!$ write(0,*) 'mat_dist: sending rows ',i_count,j_count-1,' to proc',iproc, ll + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -267,25 +274,19 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if else call psb_snd(ictxt,nnr,iproc) call psb_snd(ictxt,ll,iproc) call psb_snd(ictxt,irow(1:ll),iproc) call psb_snd(ictxt,icol(1:ll),iproc) call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) call psb_rcv(ictxt,ll,iproc) endif - else if (iam /= root) then - + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) if (iproc == iam) then call psb_rcv(ictxt,nnr,root) call psb_rcv(ictxt,ll,root) @@ -299,12 +300,11 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - + endif call psb_rcv(ictxt,irow(1:ll),root) call psb_rcv(ictxt,icol(1:ll),root) call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) call psb_snd(ictxt,ll,root) call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -313,96 +313,12 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif endif end do - i_count = i_count + 1 endif + i_count = j_count end do - call psb_barrier(ictxt) t0 = psb_wtime() call psb_cdasb(desc_a,info) @@ -431,13 +347,8 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 end if - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if + + deallocate(val,irow,icol,stat=info) if(info /= psb_success_)then info=psb_err_from_subroutine_ @@ -446,6 +357,27 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& goto 9999 end if + if (present(b_glob).and.present(b)) then + call psb_scatter(b_glob,b,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + + + if (present(x_glob).and.present(x)) then + call psb_scatter(x_glob,x,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + deallocate(iwork) if (iam == root) write (*, fmt = *) 'end matdist' @@ -456,4 +388,4 @@ subroutine dmatdist(a_glob, a, ictxt, desc_a,& return -end subroutine dmatdist +end subroutine psb_dmatdist diff --git a/util/psb_mat_dist_mod.f90 b/util/psb_mat_dist_mod.f90 index 183b3246..e563cfb6 100644 --- a/util/psb_mat_dist_mod.f90 +++ b/util/psb_mat_dist_mod.f90 @@ -30,321 +30,10 @@ !!$ !!$ module psb_mat_dist_mod - use psb_base_mod, only : psb_ipk_, psb_spk_, psb_dpk_, psb_desc_type, psb_parts, & - & psb_sspmat_type, psb_cspmat_type, psb_dspmat_type, psb_zspmat_type, & - & psb_s_base_sparse_mat, psb_c_base_sparse_mat, & - & psb_d_base_sparse_mat, psb_z_base_sparse_mat, & - & psb_s_vect_type, psb_d_vect_type, psb_c_vect_type, psb_z_vect_type - interface psb_matdist - subroutine smatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - import :: psb_ipk_, psb_sspmat_type, psb_desc_type, psb_spk_,& - & psb_s_base_sparse_mat, psb_s_vect_type, psb_parts - implicit none - - ! parameters - type(psb_sspmat_type) :: a_glob - real(psb_spk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_sspmat_type) :: a - type(psb_s_vect_type) :: b - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=*), optional :: fmt - class(psb_s_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - - end subroutine smatdist - subroutine dmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - import :: psb_ipk_, psb_dspmat_type, psb_dpk_, psb_desc_type,& - & psb_d_base_sparse_mat, psb_d_vect_type, psb_parts - implicit none - - ! parameters - type(psb_dspmat_type) :: a_glob - real(psb_dpk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_dspmat_type) :: a - type(psb_d_vect_type) :: b - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=*), optional :: fmt - class(psb_d_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - - end subroutine dmatdist - - subroutine cmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - import :: psb_ipk_, psb_cspmat_type, psb_spk_, psb_desc_type,& - & psb_c_base_sparse_mat, psb_c_vect_type, psb_parts - implicit none - - ! parameters - type(psb_cspmat_type) :: a_glob - complex(psb_spk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_cspmat_type) :: a - type(psb_c_vect_type) :: b - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=*), optional :: fmt - class(psb_c_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - end subroutine cmatdist - - subroutine zmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - import :: psb_ipk_, psb_zspmat_type, psb_dpk_, psb_desc_type,& - & psb_z_base_sparse_mat, psb_z_vect_type, psb_parts - implicit none - - ! parameters - type(psb_zspmat_type) :: a_glob - complex(psb_dpk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_zspmat_type) :: a - type(psb_z_vect_type) :: b - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=*), optional :: fmt - class(psb_z_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) - end subroutine zmatdist - end interface + use psb_s_mat_dist_mod + use psb_d_mat_dist_mod + use psb_c_mat_dist_mod + use psb_z_mat_dist_mod end module psb_mat_dist_mod diff --git a/util/psb_s_mat_dist_impl.f90 b/util/psb_s_mat_dist_impl.f90 index 0968fd7b..b24f00ab 100644 --- a/util/psb_s_mat_dist_impl.f90 +++ b/util/psb_s_mat_dist_impl.f90 @@ -29,25 +29,17 @@ !!$ POSSIBILITY OF SUCH DAMAGE. !!$ !!$ -subroutine smatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) +subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& + & info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using ! sparse matrix subroutines. ! - ! type(d_spmat) :: a_glob + ! type(psb_sspmat) :: a_glob ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. ! - ! type(d_spmat) :: a - ! on entry: fresh variable. + ! type(psb_sspmat_type) :: a ! on exit : this will contain the local sparse matrix. ! ! interface parts @@ -68,19 +60,21 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& ! distribution. ! ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. + ! on entry: the PSBLAS parallel environment context. ! ! type (desc_type) :: desc_a - ! on entry: fresh variable. ! on exit : the updated array descriptor ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : + ! real(psb_spk_), optional :: b_glob(:) + ! on entry: RHS + ! + ! type(psb_s_vect_type), optional :: b + ! on exit : this will contain the local right hand side. ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. + ! real(psb_spk_), optional :: x_glob(:) + ! on entry: initial guess + ! + ! type(psb_s_vect_type), optional :: x ! on exit : this will contain the local right hand side. ! ! integer(psb_ipk_), optional :: inroot @@ -93,34 +87,36 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& ! parameters type(psb_sspmat_type) :: a_glob - real(psb_spk_) :: b_glob(:) integer(psb_ipk_) :: ictxt type(psb_sspmat_type) :: a - type(psb_s_vect_type) :: b type(psb_desc_type) :: desc_a integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt + real(psb_spk_), optional :: b_glob(:) + type(psb_s_vect_type), optional :: b + real(psb_spk_), optional :: x_glob(:) + type(psb_s_vect_type), optional :: x + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt class(psb_s_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) ! local variables logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam, length_row + integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: i_count, j_count,& & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - real(psb_spk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 + integer(psb_ipk_), allocatable :: iwork(:) + integer(psb_ipk_), allocatable :: irow(:),icol(:) + real(psb_spk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 character(len=20) :: name, ch_err info = psb_success_ err = 0 - name = 'mat_distf' + name = 'psb_s_mat_dist' call psb_erractionsave(err_act) ! executable statements @@ -150,7 +146,19 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=" v, parts") goto 9999 endif + if ( count((/present(b_glob),present(b)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" b_glob, b") + goto 9999 + endif + if ( count((/present(x_glob),present(x)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" x_glob, x") + goto 9999 + endif + + ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -184,19 +192,10 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) if(info /= psb_success_) then info=psb_err_from_subroutine_ - ch_err='psb_psspall' + ch_err='psb_spall' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) allocate(val(isize),irow(isize),icol(isize),stat=info) @@ -212,24 +211,31 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& do while (i_count <= nrow) if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then iproc = iwork(1) do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit + call parts(j_count,nrow,np,iwork, np_sharing) + if (np_sharing /= 1 ) exit if (iwork(1) /= iproc ) exit end do end if else - length_row = 1 + np_sharing = 1 j_count = i_count iproc = v(i_count) - + iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit @@ -238,26 +244,28 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& end do end if - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' end if - ll = ll + nz - end do + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -266,25 +274,19 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if else call psb_snd(ictxt,nnr,iproc) call psb_snd(ictxt,ll,iproc) call psb_snd(ictxt,irow(1:ll),iproc) call psb_snd(ictxt,icol(1:ll),iproc) call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) call psb_rcv(ictxt,ll,iproc) endif - else if (iam /= root) then - + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) if (iproc == iam) then call psb_rcv(ictxt,nnr,root) call psb_rcv(ictxt,ll,root) @@ -298,12 +300,11 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - + endif call psb_rcv(ictxt,irow(1:ll),root) call psb_rcv(ictxt,icol(1:ll),root) call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) call psb_snd(ictxt,ll,root) call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -312,93 +313,10 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif endif end do - i_count = i_count + 1 endif + i_count = j_count end do call psb_barrier(ictxt) @@ -429,13 +347,8 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 end if - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if + + deallocate(val,irow,icol,stat=info) if(info /= psb_success_)then info=psb_err_from_subroutine_ @@ -444,6 +357,27 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& goto 9999 end if + if (present(b_glob).and.present(b)) then + call psb_scatter(b_glob,b,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + + + if (present(x_glob).and.present(x)) then + call psb_scatter(x_glob,x,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + deallocate(iwork) if (iam == root) write (*, fmt = *) 'end matdist' @@ -454,4 +388,4 @@ subroutine smatdist(a_glob, a, ictxt, desc_a,& return -end subroutine smatdist +end subroutine psb_smatdist diff --git a/util/psb_z_mat_dist_impl.f90 b/util/psb_z_mat_dist_impl.f90 index b4648f8c..16152ced 100644 --- a/util/psb_z_mat_dist_impl.f90 +++ b/util/psb_z_mat_dist_impl.f90 @@ -29,25 +29,17 @@ !!$ POSSIBILITY OF SUCH DAMAGE. !!$ !!$ -subroutine zmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) +subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& + & info, b_glob, b, x_glob, x, parts, v, inroot,fmt,mold) ! ! an utility subroutine to distribute a matrix among processors ! according to a user defined data distribution, using ! sparse matrix subroutines. ! - ! type(d_spmat) :: a_glob + ! type(psb_zspmat) :: a_glob ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. ! - ! type(d_spmat) :: a - ! on entry: fresh variable. + ! type(psb_zspmat_type) :: a ! on exit : this will contain the local sparse matrix. ! ! interface parts @@ -68,19 +60,21 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& ! distribution. ! ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. + ! on entry: the PSBLAS parallel environment context. ! ! type (desc_type) :: desc_a - ! on entry: fresh variable. ! on exit : the updated array descriptor ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : + ! complex(psb_dpk_), optional :: b_glob(:) + ! on entry: RHS + ! + ! type(psb_z_vect_type), optional :: b + ! on exit : this will contain the local right hand side. ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. + ! complex(psb_dpk_), optional :: x_glob(:) + ! on entry: initial guess + ! + ! type(psb_z_vect_type), optional :: x ! on exit : this will contain the local right hand side. ! ! integer(psb_ipk_), optional :: inroot @@ -93,34 +87,36 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& ! parameters type(psb_zspmat_type) :: a_glob - complex(psb_dpk_) :: b_glob(:) integer(psb_ipk_) :: ictxt type(psb_zspmat_type) :: a - type(psb_z_vect_type) :: b type(psb_desc_type) :: desc_a integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt + complex(psb_dpk_), optional :: b_glob(:) + type(psb_z_vect_type), optional :: b + complex(psb_dpk_), optional :: x_glob(:) + type(psb_z_vect_type), optional :: x + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt class(psb_z_base_sparse_mat), optional :: mold - procedure(psb_parts), optional :: parts - integer(psb_ipk_), optional :: v(:) + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) ! local variables logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam, length_row + integer(psb_ipk_) :: np, iam, np_sharing integer(psb_ipk_) :: i_count, j_count,& & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - complex(psb_dpk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 + integer(psb_ipk_), allocatable :: iwork(:) + integer(psb_ipk_), allocatable :: irow(:),icol(:) + complex(psb_dpk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 character(len=20) :: name, ch_err info = psb_success_ err = 0 - name = 'mat_distf' + name = 'psb_z_mat_dist' call psb_erractionsave(err_act) ! executable statements @@ -150,7 +146,19 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=" v, parts") goto 9999 endif + if ( count((/present(b_glob),present(b)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" b_glob, b") + goto 9999 + endif + if ( count((/present(x_glob),present(x)/)) == 1 ) then + info=psb_err_optional_arg_pair_ + call psb_errpush(info,name,a_err=" x_glob, x") + goto 9999 + endif + + ! broadcast informations to other processors call psb_bcast(ictxt,nrow, root) call psb_bcast(ictxt,ncol, root) @@ -184,19 +192,10 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) if(info /= psb_success_) then info=psb_err_from_subroutine_ - ch_err='psb_psspall' + ch_err='psb_spall' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) allocate(val(isize),irow(isize),icol(isize),stat=info) @@ -212,24 +211,31 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& do while (i_count <= nrow) if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then iproc = iwork(1) do j_count = j_count + 1 if (j_count-i_count >= nb) exit if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit + call parts(j_count,nrow,np,iwork, np_sharing) + if (np_sharing /= 1 ) exit if (iwork(1) /= iproc ) exit end do end if else - length_row = 1 + np_sharing = 1 j_count = i_count iproc = v(i_count) - + iwork(1:np_sharing) = iproc do j_count = j_count + 1 if (j_count-i_count >= nb) exit @@ -238,26 +244,28 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& end do end if - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' end if - ll = ll + nz - end do + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -266,25 +274,19 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if else call psb_snd(ictxt,nnr,iproc) call psb_snd(ictxt,ll,iproc) call psb_snd(ictxt,irow(1:ll),iproc) call psb_snd(ictxt,icol(1:ll),iproc) call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) call psb_rcv(ictxt,ll,iproc) endif - else if (iam /= root) then - + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) if (iproc == iam) then call psb_rcv(ictxt,nnr,root) call psb_rcv(ictxt,ll,root) @@ -298,12 +300,11 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - + endif call psb_rcv(ictxt,irow(1:ll),root) call psb_rcv(ictxt,icol(1:ll),root) call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) call psb_snd(ictxt,ll,root) call psb_spins(ll,irow,icol,val,a,desc_a,info) if(info /= psb_success_) then @@ -312,93 +313,10 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(ione,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif endif end do - i_count = i_count + 1 endif + i_count = j_count end do call psb_barrier(ictxt) @@ -429,13 +347,8 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 end if - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if + + deallocate(val,irow,icol,stat=info) if(info /= psb_success_)then info=psb_err_from_subroutine_ @@ -444,6 +357,27 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& goto 9999 end if + if (present(b_glob).and.present(b)) then + call psb_scatter(b_glob,b,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + + + if (present(x_glob).and.present(x)) then + call psb_scatter(x_glob,x,desc_a,info,root=root) + if (info /= 0) then + info=psb_err_from_subroutine_ + ch_err='psb_scatter' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + end if + deallocate(iwork) if (iam == root) write (*, fmt = *) 'end matdist' @@ -454,4 +388,4 @@ subroutine zmatdist(a_glob, a, ictxt, desc_a,& return -end subroutine zmatdist +end subroutine psb_zmatdist