diff --git a/base/internals/psb_indx_map_fnd_owner.F90 b/base/internals/psb_indx_map_fnd_owner.F90 index 9777c9f3..c1addb48 100644 --- a/base/internals/psb_indx_map_fnd_owner.F90 +++ b/base/internals/psb_indx_map_fnd_owner.F90 @@ -70,7 +70,7 @@ subroutine psb_indx_map_fnd_owner(idx,iprc,idxmap,info) & sdsz(:),sdidx(:), rvsz(:), rvidx(:),answers(:,:),idxsrch(:,:) integer(psb_ipk_) :: i,n_row,n_col,err_act,ih,icomm,hsize,ip,isz,k,j,& - & last_ih, last_j, nv + & last_ih, last_j, nv, mglob, nresp integer(psb_ipk_) :: ictxt,np,me logical, parameter :: gettime=.false. real(psb_dpk_) :: t0, t1, t2, t3, t4, tamx, tidx @@ -82,12 +82,13 @@ subroutine psb_indx_map_fnd_owner(idx,iprc,idxmap,info) ictxt = idxmap%get_ctxt() icomm = idxmap%get_mpic() + mglob = idxmap%get_gr() n_row = idxmap%get_lr() n_col = idxmap%get_lc() - + call psb_info(ictxt, me, np) - + if (np == -1) then info = psb_err_context_error_ call psb_errpush(info,name) @@ -104,169 +105,197 @@ subroutine psb_indx_map_fnd_owner(idx,iprc,idxmap,info) end if nv = size(idx) - ! - ! The basic idea is very simple. - ! First we collect (to all) all the requests. - Allocate(hidx(np+1),hsz(np),& - & sdsz(0:np-1),sdidx(0:np-1),& - & rvsz(0:np-1),rvidx(0:np-1),& - & stat=info) + call psb_realloc(nv,iprc,info) if (info /= psb_success_) then - call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') + call psb_errpush(psb_err_from_subroutine_,name,a_err='psb_realloc') goto 9999 end if - hsz = 0 - hsz(me+1) = nv - call psb_amx(ictxt,hsz) - hidx(1) = 0 - do i=1, np - hidx(i+1) = hidx(i) + hsz(i) - end do - hsize = hidx(np+1) - Allocate(helem(hsize),hproc(hsize),stat=info) - if (info /= psb_success_) then - call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') - goto 9999 - end if + if (associated(idxmap%parts)) then + ! Use function shortcut +!!$ write(0,*) me,trim(name),' indxmap%parts shortcut' + Allocate(hidx(np), stat=info) + if (info /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') + goto 9999 + end if + do i=1, nv + call idxmap%parts(idx(i),mglob,np,hidx,nresp) + if (nresp > 0) then + iprc(i) = hidx(1) + else + iprc(i) = -1 + end if + end do - if (gettime) then - t3 = psb_wtime() - end if + else if (allocated(idxmap%tempvg)) then +!!$ write(0,*) me,trim(name),' indxmap%tempvg shortcut' + ! Use temporary vector + do i=1, nv + iprc(i) = idxmap%tempvg(idx(i)) + end do - call mpi_allgatherv(idx,hsz(me+1),psb_mpi_integer,& - & hproc,hsz,hidx,psb_mpi_integer,& - & icomm,info) - if (gettime) then - tamx = psb_wtime() - t3 - end if + else - ! Second, we figure out locally whether we own the indices (whoever is - ! asking for them). - if (gettime) then - t3 = psb_wtime() - end if + ! + ! The basic idea is very simple. + ! First we collect (to all) all the requests. + Allocate(hidx(np+1),hsz(np),& + & sdsz(0:np-1),sdidx(0:np-1),& + & rvsz(0:np-1),rvidx(0:np-1),& + & stat=info) + if (info /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') + goto 9999 + end if - call idxmap%g2l(hproc(1:hsize),helem(1:hsize),info,owned=.true.) - if (gettime) then - tidx = psb_wtime()-t3 - end if - if (info == psb_err_iarray_outside_bounds_) info = psb_success_ - if (info /= psb_success_) then - call psb_errpush(psb_err_from_subroutine_,name,a_err='psi_idx_cnv') - goto 9999 - end if + hsz = 0 + hsz(me+1) = nv + call psb_amx(ictxt,hsz) + hidx(1) = 0 + do i=1, np + hidx(i+1) = hidx(i) + hsz(i) + end do + hsize = hidx(np+1) + Allocate(helem(hsize),hproc(hsize),stat=info) + if (info /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') + goto 9999 + end if - ! Third: we build the answers for those indices we own, - ! with a section for each process asking. - hidx = hidx +1 - j = 0 - do ip = 0, np-1 - sdidx(ip) = j - sdsz(ip) = 0 - do i=hidx(ip+1), hidx(ip+1+1)-1 - if ((0 < helem(i)).and. (helem(i) <= n_row)) then - j = j + 1 - hproc(j) = hproc(i) - sdsz(ip) = sdsz(ip) + 1 - end if + if (gettime) then + t3 = psb_wtime() + end if + + call mpi_allgatherv(idx,hsz(me+1),psb_mpi_integer,& + & hproc,hsz,hidx,psb_mpi_integer,& + & icomm,info) + if (gettime) then + tamx = psb_wtime() - t3 + end if + + ! Second, we figure out locally whether we own the indices (whoever is + ! asking for them). + if (gettime) then + t3 = psb_wtime() + end if + + call idxmap%g2l(hproc(1:hsize),helem(1:hsize),info,owned=.true.) + if (gettime) then + tidx = psb_wtime()-t3 + end if + if (info == psb_err_iarray_outside_bounds_) info = psb_success_ + if (info /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='psi_idx_cnv') + goto 9999 + end if + + ! Third: we build the answers for those indices we own, + ! with a section for each process asking. + hidx = hidx +1 + j = 0 + do ip = 0, np-1 + sdidx(ip) = j + sdsz(ip) = 0 + do i=hidx(ip+1), hidx(ip+1+1)-1 + if ((0 < helem(i)).and. (helem(i) <= n_row)) then + j = j + 1 + hproc(j) = hproc(i) + sdsz(ip) = sdsz(ip) + 1 + end if + end do end do - end do - if (gettime) then - t3 = psb_wtime() - end if + if (gettime) then + t3 = psb_wtime() + end if - ! Collect all the answers with alltoallv (need sizes) - call mpi_alltoall(sdsz,1,psb_mpi_integer,rvsz,1,mpi_integer,icomm,info) + ! Collect all the answers with alltoallv (need sizes) + call mpi_alltoall(sdsz,1,psb_mpi_integer,rvsz,1,mpi_integer,icomm,info) - isz = sum(rvsz) + isz = sum(rvsz) - allocate(answers(isz,2),idxsrch(nv,2),stat=info) - if (info /= psb_success_) then - call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') - goto 9999 - end if - j = 0 - do ip=0, np-1 - rvidx(ip) = j - j = j + rvsz(ip) - end do - call mpi_alltoallv(hproc,sdsz,sdidx,psb_mpi_integer,& - & answers(:,1),rvsz,rvidx,psb_mpi_integer,& - & icomm,info) - if (gettime) then - tamx = psb_wtime() - t3 + tamx - end if - j = 1 - do ip = 0,np-1 - do k=1,rvsz(ip) - answers(j,2) = ip - j = j + 1 + allocate(answers(isz,2),idxsrch(nv,2),stat=info) + if (info /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') + goto 9999 + end if + j = 0 + do ip=0, np-1 + rvidx(ip) = j + j = j + rvsz(ip) end do - end do - ! Sort the answers and the requests, so we can - ! match them efficiently - call psb_msort(answers(:,1),ix=answers(:,2),& - & flag=psb_sort_keep_idx_) - idxsrch(1:nv,1) = idx(1:nv) - call psb_msort(idxsrch(1:nv,1),ix=idxsrch(1:nv,2)) - - ! Now extract the answers for our local query - call psb_realloc(nv,iprc,info) - if (info /= psb_success_) then - call psb_errpush(psb_err_from_subroutine_,name,a_err='psb_realloc') - goto 9999 - end if - last_ih = -1 - last_j = -1 - j = 1 - do i=1, nv - ih = idxsrch(i,1) - if (ih == last_ih) then - iprc(idxsrch(i,2)) = answers(last_j,2) - else - - do - if (j > size(answers,1)) then - ! Last resort attempt. - j = psb_ibsrch(ih,size(answers,1),answers(:,1)) - if (j == -1) then - write(psb_err_unit,*) me,'psi_fnd_owner: searching for ',ih, & - & 'not found : ',size(answers,1),':',answers(:,1) - info = psb_err_internal_error_ - call psb_errpush(psb_err_internal_error_,name,a_err='out bounds srch ih') - goto 9999 + call mpi_alltoallv(hproc,sdsz,sdidx,psb_mpi_integer,& + & answers(:,1),rvsz,rvidx,psb_mpi_integer,& + & icomm,info) + if (gettime) then + tamx = psb_wtime() - t3 + tamx + end if + j = 1 + do ip = 0,np-1 + do k=1,rvsz(ip) + answers(j,2) = ip + j = j + 1 + end do + end do + ! Sort the answers and the requests, so we can + ! match them efficiently + call psb_msort(answers(:,1),ix=answers(:,2),& + & flag=psb_sort_keep_idx_) + idxsrch(1:nv,1) = idx(1:nv) + call psb_msort(idxsrch(1:nv,1),ix=idxsrch(1:nv,2)) + + ! Now extract the answers for our local query + last_ih = -1 + last_j = -1 + j = 1 + do i=1, nv + ih = idxsrch(i,1) + if (ih == last_ih) then + iprc(idxsrch(i,2)) = answers(last_j,2) + else + + do + if (j > size(answers,1)) then + ! Last resort attempt. + j = psb_ibsrch(ih,size(answers,1),answers(:,1)) + if (j == -1) then + write(psb_err_unit,*) me,'psi_fnd_owner: searching for ',ih, & + & 'not found : ',size(answers,1),':',answers(:,1) + info = psb_err_internal_error_ + call psb_errpush(psb_err_internal_error_,name,a_err='out bounds srch ih') + goto 9999 + end if end if - end if - if (answers(j,1) == ih) exit - if (answers(j,1) > ih) then - k = j - j = psb_ibsrch(ih,k,answers(1:k,1)) - if (j == -1) then - write(psb_err_unit,*) me,'psi_fnd_owner: searching for ',ih, & - & 'not found : ',size(answers,1),':',answers(:,1) - info = psb_err_internal_error_ - call psb_errpush(psb_err_internal_error_,name,a_err='out bounds srch ih') - goto 9999 + if (answers(j,1) == ih) exit + if (answers(j,1) > ih) then + k = j + j = psb_ibsrch(ih,k,answers(1:k,1)) + if (j == -1) then + write(psb_err_unit,*) me,'psi_fnd_owner: searching for ',ih, & + & 'not found : ',size(answers,1),':',answers(:,1) + info = psb_err_internal_error_ + call psb_errpush(psb_err_internal_error_,name,a_err='out bounds srch ih') + goto 9999 + end if end if - end if - j = j + 1 - end do - ! Note that the answers here are given in order - ! of sending process, so we are implicitly getting - ! the max process index in case of overlap. - last_ih = ih - do - last_j = j - iprc(idxsrch(i,2)) = answers(j,2) - j = j + 1 - if (j > size(answers,1)) exit - if (answers(j,1) /= ih) exit - end do - end if - end do + j = j + 1 + end do + ! Note that the answers here are given in order + ! of sending process, so we are implicitly getting + ! the max process index in case of overlap. + last_ih = ih + do + last_j = j + iprc(idxsrch(i,2)) = answers(j,2) + j = j + 1 + if (j > size(answers,1)) exit + if (answers(j,1) /= ih) exit + end do + end if + end do + end if if (gettime) then call psb_barrier(ictxt) diff --git a/base/modules/psb_base_tools_mod.f90 b/base/modules/psb_base_tools_mod.f90 index 8a05ff94..939acf12 100644 --- a/base/modules/psb_base_tools_mod.f90 +++ b/base/modules/psb_base_tools_mod.f90 @@ -423,14 +423,14 @@ module psb_cd_tools_mod interface psb_cdall subroutine psb_cdall(ictxt, desc, info,mg,ng,parts,vg,vl,flag,nl,repl, globalcheck) - import :: psb_ipk_, psb_desc_type + import :: psb_ipk_, psb_desc_type, psb_parts implicit None - include 'parts.fh' - integer(psb_ipk_), intent(in) :: mg,ng,ictxt, vg(:), vl(:),nl - integer(psb_ipk_), intent(in) :: flag - logical, intent(in) :: repl, globalcheck - integer(psb_ipk_), intent(out) :: info - type(psb_desc_type), intent(out) :: desc + procedure(psb_parts) :: parts + integer(psb_ipk_), intent(in) :: mg,ng,ictxt, vg(:), vl(:),nl + integer(psb_ipk_), intent(in) :: flag + logical, intent(in) :: repl, globalcheck + integer(psb_ipk_), intent(out) :: info + type(psb_desc_type), intent(out) :: desc optional :: mg,ng,parts,vg,vl,flag,nl,repl, globalcheck end subroutine psb_cdall diff --git a/base/modules/psb_desc_const_mod.f90 b/base/modules/psb_desc_const_mod.f90 index bc165346..cb449291 100644 --- a/base/modules/psb_desc_const_mod.f90 +++ b/base/modules/psb_desc_const_mod.f90 @@ -119,4 +119,12 @@ module psb_desc_const_mod integer(psb_ipk_), parameter :: psb_ovrlp_elem_to_=2, psb_ovrlp_elem_=0 integer(psb_ipk_), parameter :: psb_n_dom_ovr_=1 + interface + subroutine psb_parts(glob_index,nrow,np,pv,nv) + import :: psb_ipk_ + integer(psb_ipk_), intent (in) :: glob_index,np,nrow + integer(psb_ipk_), intent (out) :: nv, pv(*) + end subroutine psb_parts + end interface + end module psb_desc_const_mod diff --git a/base/modules/psb_indx_map_mod.f90 b/base/modules/psb_indx_map_mod.f90 index f6c09c65..4e64e6ef 100644 --- a/base/modules/psb_indx_map_mod.f90 +++ b/base/modules/psb_indx_map_mod.f90 @@ -95,7 +95,11 @@ module psb_indx_map_mod integer(psb_ipk_) :: global_cols = -1 integer(psb_ipk_) :: local_rows = -1 integer(psb_ipk_) :: local_cols = -1 - + + procedure(psb_parts), nopass, pointer :: parts => null() + integer(psb_ipk_), allocatable :: tempvg(:) + integer(psb_ipk_), allocatable :: oracle(:,:) + contains procedure, pass(idxmap) :: get_state => base_get_state diff --git a/base/tools/psb_cd_inloc.f90 b/base/tools/psb_cd_inloc.f90 index c57ec03c..30603606 100644 --- a/base/tools/psb_cd_inloc.f90 +++ b/base/tools/psb_cd_inloc.f90 @@ -289,7 +289,6 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) & stat=info) if (info == psb_success_) then desc%lprm(1) = 0 -!!$ desc%matrix_data(:) = 0 end if if (info /= psb_success_) then info=psb_err_alloc_request_ @@ -299,11 +298,6 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) endif temp_ovrlap(:) = -1 -!!$ desc%matrix_data(psb_m_) = m -!!$ desc%matrix_data(psb_n_) = n -!!$ ! This has to be set BEFORE any call to SET_BLD -!!$ desc%matrix_data(psb_ctxt_) = ictxt -!!$ call psb_get_mpicomm(ictxt,desc%matrix_data(psb_mpi_c_)) if (debug_level >= psb_debug_ext_) & @@ -371,31 +365,6 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) goto 9999 endif -!!$ ! set fields in desc%MATRIX_DATA.... -!!$ desc%matrix_data(psb_n_row_) = loc_row -!!$ desc%matrix_data(psb_n_col_) = loc_row - -!!$ call psb_realloc(max(1,loc_row/2),desc%halo_index, info) -!!$ if (info == psb_success_) call psb_realloc(1,desc%ext_index, info) -!!$ if (info /= psb_success_) then -!!$ info=psb_err_from_subroutine_ -!!$ call psb_errpush(info,name,a_err='psb_realloc') -!!$ Goto 9999 -!!$ end if -!!$ desc%matrix_data(psb_pnt_h_) = 1 -!!$ desc%halo_index(:) = -1 -!!$ desc%ext_index(:) = -1 -!!$ -!!$ if (debug_level >= psb_debug_ext_) & -!!$ & write(debug_unit,*) me,' ',trim(name),': end' -!!$ -!!$ call psb_cd_set_bld(desc,info) -!!$ if (info /= psb_success_) then -!!$ info=psb_err_from_subroutine_ -!!$ call psb_errpush(info,name,a_err='psb_cd_set_bld') -!!$ Goto 9999 -!!$ end if - call psb_erractionrestore(err_act) return diff --git a/base/tools/psb_cdall.f90 b/base/tools/psb_cdall.f90 index ae4e54ca..768f5759 100644 --- a/base/tools/psb_cdall.f90 +++ b/base/tools/psb_cdall.f90 @@ -7,7 +7,7 @@ subroutine psb_cdall(ictxt, desc, info,mg,ng,parts,vg,vl,flag,nl,repl, globalche use psb_base_tools_mod, psb_protect_name => psb_cdall use psi_mod implicit None - include 'parts.fh' + procedure(psb_parts) :: parts integer(psb_ipk_), intent(in) :: mg,ng,ictxt, vg(:), vl(:),nl integer(psb_ipk_), intent(in) :: flag logical, intent(in) :: repl, globalcheck @@ -19,7 +19,7 @@ subroutine psb_cdall(ictxt, desc, info,mg,ng,parts,vg,vl,flag,nl,repl, globalche interface subroutine psb_cdals(m, n, parts, ictxt, desc, info) use psb_descriptor_type - include 'parts.fh' + procedure(psb_parts) :: parts integer(psb_ipk_), intent(in) :: m,n,ictxt Type(psb_desc_type), intent(out) :: desc integer(psb_ipk_), intent(out) :: info @@ -168,12 +168,9 @@ subroutine psb_cdall(ictxt, desc, info,mg,ng,parts,vg,vl,flag,nl,repl, globalche call psb_errpush(info,name,a_err='psb_realloc') Goto 999 end if -!!$ desc%matrix_data(psb_pnt_h_) = 1 desc%halo_index(:) = -1 desc%ext_index(:) = -1 call psb_cd_set_bld(desc,info) -!!$ desc%matrix_data(psb_n_row_) = desc%indxmap%get_lr() -!!$ desc%matrix_data(psb_n_col_) = desc%indxmap%get_lc() if (info /= psb_success_) goto 999 call psb_erractionrestore(err_act) diff --git a/base/tools/psb_cdals.f90 b/base/tools/psb_cdals.f90 index fa4ae30a..d00067bb 100644 --- a/base/tools/psb_cdals.f90 +++ b/base/tools/psb_cdals.f90 @@ -50,7 +50,7 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) use psb_list_map_mod use psb_hash_map_mod implicit None - include 'parts.fh' + procedure(psb_parts) :: parts !....Parameters... integer(psb_ipk_), intent(in) :: M,N,ictxt Type(psb_desc_type), intent(out) :: desc @@ -125,9 +125,6 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) ! count local rows number loc_row = max(1,(m+np-1)/np) ! allocate work vector -!!$ allocate(desc%matrix_data(psb_mdata_size_),& -!!$ & temp_ovrlap(max(1,2*loc_row)), prc_v(np),stat=info) -!!$ desc%matrix_data(:) = 0 allocate(temp_ovrlap(max(1,2*loc_row)), prc_v(np),stat=info) if (info /= psb_success_) then @@ -137,12 +134,6 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) call psb_errpush(err,name,int_err,a_err='integer') goto 9999 endif -!!$ desc%matrix_data(psb_m_) = m -!!$ desc%matrix_data(psb_n_) = n -!!$ ! This has to be set BEFORE any call to SET_BLD -!!$ desc%matrix_data(psb_ctxt_) = ictxt -!!$ call psb_get_mpicomm(ictxt,desc%matrix_data(psb_mpi_c_)) - if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': starting main loop' ,info @@ -171,6 +162,7 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) allocate(psb_repl_map :: desc%indxmap, stat=info) else allocate(psb_hash_map :: desc%indxmap, stat=info) + desc%indxmap%parts => parts end if end if @@ -289,15 +281,6 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) Goto 9999 endif -!!$ ! set fields in desc%MATRIX_DATA.... -!!$ desc%matrix_data(psb_n_row_) = loc_row -!!$ desc%matrix_data(psb_n_col_) = loc_row - -!!$ write(0,*) me,'CDALS: after init ', & -!!$ & desc%indxmap%get_gr(), & -!!$ & desc%indxmap%get_gc(), & -!!$ & desc%indxmap%get_lr(), & -!!$ & desc%indxmap%get_lc() if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': end' diff --git a/base/tools/psb_cdalv.f90 b/base/tools/psb_cdalv.f90 index fb85b202..cb0c09b9 100644 --- a/base/tools/psb_cdalv.f90 +++ b/base/tools/psb_cdalv.f90 @@ -144,11 +144,6 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) call psb_errpush(info,name,i_err=int_err,a_err='integer') goto 9999 endif -!!$ desc%matrix_data(psb_m_) = m -!!$ desc%matrix_data(psb_n_) = n -!!$ ! This has to be set BEFORE any call to SET_BLD -!!$ desc%matrix_data(psb_ctxt_) = ictxt -!!$ call psb_get_mpicomm(ictxt,desc%matrix_data(psb_mpi_c_)) if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': starting main loop' ,info @@ -181,6 +176,8 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) else if (psb_cd_choose_large_state(ictxt,m)) then allocate(psb_hash_map :: desc%indxmap, stat=info) + if (info == 0) allocate(desc%indxmap%tempvg(m),stat=info) + if (info ==0) desc%indxmap%tempvg(1:m) = v(1:m) - flag_ else allocate(psb_glist_map :: desc%indxmap, stat=info) end if @@ -211,10 +208,6 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) goto 9999 endif -!!$ ! set fields in desc%MATRIX_DATA.... -!!$ desc%matrix_data(psb_n_row_) = loc_row -!!$ desc%matrix_data(psb_n_col_) = loc_row -!!$ if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': end' diff --git a/base/tools/psb_cdcpy.F90 b/base/tools/psb_cdcpy.F90 index 765401f2..af0bb3e2 100644 --- a/base/tools/psb_cdcpy.F90 +++ b/base/tools/psb_cdcpy.F90 @@ -86,12 +86,6 @@ subroutine psb_cdcpy(desc_in, desc_out, info) if (allocated(desc_in%indxmap)) then -!!$ if (allocated(desc_out%indxmap)) then -!!$ ! This should never happen -!!$ call desc_out%indxmap%free() -!!$ deallocate(desc_out%indxmap) -!!$ end if -!!$ write(debug_unit,*) me,' ',trim(name),': Calling allocate(SOURCE = )' #ifdef SOURCE_WORKAROUND call desc_in%indxmap%clone(desc_out%indxmap,info) #else diff --git a/base/tools/psb_icdasb.F90 b/base/tools/psb_icdasb.F90 index 41c91a29..747c6475 100644 --- a/base/tools/psb_icdasb.F90 +++ b/base/tools/psb_icdasb.F90 @@ -36,13 +36,13 @@ ! The user callable routine is defined in the psb_tools_mod module. ! ! Arguments: -! desc_a - type(psb_desc_type). The communication descriptor. +! desc - type(psb_desc_type). The communication descriptor. ! info - integer. return code. ! ext_hv - logical Essentially this distinguishes a call ! coming from the build of an extended ! halo descriptor with respect to a normal call. ! -subroutine psb_icdasb(desc_a,info,ext_hv) +subroutine psb_icdasb(desc,info,ext_hv) use psb_base_mod, psb_protect_name => psb_icdasb use psi_mod #ifdef MPI_MOD @@ -53,7 +53,7 @@ subroutine psb_icdasb(desc_a,info,ext_hv) include 'mpif.h' #endif !...Parameters.... - type(psb_desc_type), intent(inout) :: desc_a + type(psb_desc_type), intent(inout) :: desc integer(psb_ipk_), intent(out) :: info logical, intent(in), optional :: ext_hv @@ -75,10 +75,10 @@ subroutine psb_icdasb(desc_a,info,ext_hv) debug_unit = psb_get_debug_unit() debug_level = psb_get_debug_level() - ictxt = desc_a%get_context() - dectype = desc_a%get_dectype() - n_row = desc_a%get_local_rows() - n_col = desc_a%get_local_cols() + ictxt = desc%get_context() + dectype = desc%get_dectype() + n_row = desc%get_local_rows() + n_col = desc%get_local_cols() call psb_get_mpicomm(ictxt,icomm ) ! check on blacs grid @@ -89,7 +89,7 @@ subroutine psb_icdasb(desc_a,info,ext_hv) goto 9999 endif - if (.not.psb_is_ok_desc(desc_a)) then + if (.not.psb_is_ok_desc(desc)) then info = psb_err_spmat_invalid_state_ int_err(1) = dectype call psb_errpush(info,name) @@ -113,22 +113,22 @@ subroutine psb_icdasb(desc_a,info,ext_hv) if (debug_level >= psb_debug_ext_) & & write(debug_unit, *) me,' ',trim(name),': start' - if (allocated(desc_a%indxmap)) then - call psi_ldsc_pre_halo(desc_a,ext_hv_,info) + if (allocated(desc%indxmap)) then + call psi_ldsc_pre_halo(desc,ext_hv_,info) if (info /= psb_success_) then call psb_errpush(psb_err_from_subroutine_,name,a_err='ldsc_pre_halo') goto 9999 end if ! Take out the lists for ovrlap, halo and ext... - call psb_move_alloc(desc_a%ovrlap_index,ovrlap_index,info) - call psb_move_alloc(desc_a%halo_index,halo_index,info) - call psb_move_alloc(desc_a%ext_index,ext_index,info) + call psb_move_alloc(desc%ovrlap_index,ovrlap_index,info) + call psb_move_alloc(desc%halo_index,halo_index,info) + call psb_move_alloc(desc%ext_index,ext_index,info) if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': Final conversion' ! Then convert and put them back where they belong. - call psi_cnv_dsc(halo_index,ovrlap_index,ext_index,desc_a,info) + call psi_cnv_dsc(halo_index,ovrlap_index,ext_index,desc,info) if (info /= psb_success_) then call psb_errpush(psb_err_from_subroutine_,name,a_err='psi_cnv_dsc') @@ -142,17 +142,16 @@ subroutine psb_icdasb(desc_a,info,ext_hv) goto 9999 end if - call desc_a%indxmap%asb(info) + call desc%indxmap%asb(info) + if (info == psb_success_) then + if (allocated(desc%indxmap%tempvg)) & + & deallocate(desc%indxmap%tempvg,stat=info) + end if if (info /= psb_success_) then write(0,*) 'Error from internal indxmap asb ',info info = psb_success_ end if -!!$ desc_a%matrix_data(psb_n_row_) = desc_a%indxmap%get_lr() -!!$ desc_a%matrix_data(psb_n_col_) = desc_a%indxmap%get_lc() -!!$ ! Ok, register into MATRIX_DATA -!!$ desc_a%matrix_data(psb_dec_type_) = psb_desc_asb_ - else info = psb_err_spmat_invalid_state_ call psb_errpush(info,name) diff --git a/util/psb_mat_dist_impl.f90 b/util/psb_mat_dist_impl.f90 deleted file mode 100644 index 02da5941..00000000 --- a/util/psb_mat_dist_impl.f90 +++ /dev/null @@ -1,1797 +0,0 @@ -!!$ -!!$ Parallel Sparse BLAS version 3.0 -!!$ (C) Copyright 2006, 2007, 2008, 2009, 2010 -!!$ Salvatore Filippone University of Rome Tor Vergata -!!$ Alfredo Buttari CNRS-IRIT, Toulouse -!!$ -!!$ Redistribution and use in source and binary forms, with or without -!!$ modification, are permitted provided that the following conditions -!!$ are met: -!!$ 1. Redistributions of source code must retain the above copyright -!!$ notice, this list of conditions and the following disclaimer. -!!$ 2. Redistributions in binary form must reproduce the above copyright -!!$ notice, this list of conditions, and the following disclaimer in the -!!$ documentation and/or other materials provided with the distribution. -!!$ 3. The name of the PSBLAS group or the names of its contributors may -!!$ not be used to endorse or promote products derived from this -!!$ software without specific written permission. -!!$ -!!$ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -!!$ ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED -!!$ TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -!!$ PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PSBLAS GROUP OR ITS CONTRIBUTORS -!!$ BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -!!$ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -!!$ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -!!$ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -!!$ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -!!$ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -!!$ POSSIBILITY OF SUCH DAMAGE. -!!$ -!!$ -subroutine smatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - use psb_base_mod - use psb_mat_mod - implicit none - - ! parameters - type(psb_sspmat_type) :: a_glob - real(psb_spk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_sspmat_type) :: a - real(psb_spk_), allocatable :: b(:) - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt - class(psb_s_base_sparse_mat), optional :: mold - - integer(psb_ipk_) :: v(:) - interface - subroutine parts(global_indx,n,np,pv,nv) - implicit none - integer(psb_ipk_), intent(in) :: global_indx, n, np - integer(psb_ipk_), intent(out) :: nv - integer(psb_ipk_), intent(out) :: pv(*) - end subroutine parts - end interface - optional :: parts, v - - ! local variables - logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam - integer(psb_ipk_) :: length_row, i_count, j_count,& - & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& - & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - real(psb_spk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 - real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 - character(len=20) :: name, ch_err - - info = psb_success_ - err = 0 - name = 'mat_distf' - call psb_erractionsave(err_act) - - ! executable statements - if (present(inroot)) then - root = inroot - else - root = psb_root_ - end if - call psb_info(ictxt, iam, np) - if (iam == root) then - nrow = a_glob%get_nrows() - ncol = a_glob%get_ncols() - if (nrow /= ncol) then - write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol - info=-1 - call psb_errpush(info,name) - goto 9999 - endif - nnzero = a_glob%get_nzeros() - nrhs = 1 - endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - - ! broadcast informations to other processors - call psb_bcast(ictxt,nrow, root) - call psb_bcast(ictxt,ncol, root) - call psb_bcast(ictxt,nnzero, root) - call psb_bcast(ictxt,nrhs, root) - liwork = max(np, nrow + ncol) - allocate(iwork(liwork), stat = info) - if (info /= psb_success_) then - info=psb_err_alloc_request_ - int_err(1)=liwork - call psb_errpush(info,name,i_err=int_err,a_err='integer') - goto 9999 - endif - if (iam == root) then - write (*, fmt = *) 'start matdist',root, size(iwork),& - &nrow, ncol, nnzero,nrhs - endif - if (use_parts) then - call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) - else - info = -1 - end if - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_cdall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psspall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) - allocate(val(isize),irow(isize),icol(isize),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - i_count = 1 - - do while (i_count <= nrow) - - if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count - iproc = iwork(1) - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit - if (iwork(1) /= iproc ) exit - end do - end if - else - length_row = 1 - j_count = i_count - iproc = v(i_count) - - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit - end do - end if - - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (iproc == iam) then - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_spins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,nnr,iproc) - call psb_snd(ictxt,ll,iproc) - call psb_snd(ictxt,irow(1:ll),iproc) - call psb_snd(ictxt,icol(1:ll),iproc) - call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) - call psb_rcv(ictxt,ll,iproc) - endif - else if (iam /= root) then - - if (iproc == iam) then - call psb_rcv(ictxt,nnr,root) - call psb_rcv(ictxt,ll,root) - if (ll > size(irow)) then - write(psb_err_unit,*) iam,'need to reallocate ',ll - deallocate(val,irow,icol) - allocate(val(ll),irow(ll),icol(ll),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - endif - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - end do - i_count = i_count + 1 - endif - end do - - call psb_barrier(ictxt) - t0 = psb_wtime() - call psb_cdasb(desc_a,info) - t1 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_cdasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - call psb_barrier(ictxt) - t2 = psb_wtime() - call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) - t3 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_spasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - if (iam == root) then - write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 - write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 - end if - - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - deallocate(val,irow,icol,stat=info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='deallocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - deallocate(iwork) - if (iam == root) write (*, fmt = *) 'end matdist' - - call psb_erractionrestore(err_act) - return - -9999 continue - call psb_erractionrestore(err_act) - if (err_act == psb_act_abort_) then - call psb_error(ictxt) - return - end if - return - -end subroutine smatdist - - -subroutine dmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - use psb_base_mod - use psb_mat_mod - implicit none - - ! parameters - type(psb_dspmat_type) :: a_glob - real(psb_dpk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_dspmat_type) :: a - real(psb_dpk_), allocatable :: b(:) - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt - class(psb_d_base_sparse_mat), optional :: mold - - integer(psb_ipk_) :: v(:) - interface - subroutine parts(global_indx,n,np,pv,nv) - implicit none - integer(psb_ipk_), intent(in) :: global_indx, n, np - integer(psb_ipk_), intent(out) :: nv - integer(psb_ipk_), intent(out) :: pv(*) - end subroutine parts - end interface - optional :: parts, v - - ! local variables - logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam - integer(psb_ipk_) :: length_row, i_count, j_count,& - & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& - & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - real(psb_dpk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 - real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 - character(len=20) :: name, ch_err - - info = psb_success_ - err = 0 - name = 'mat_distf' - call psb_erractionsave(err_act) - - ! executable statements - if (present(inroot)) then - root = inroot - else - root = psb_root_ - end if - call psb_info(ictxt, iam, np) - if (iam == root) then - nrow = a_glob%get_nrows() - ncol = a_glob%get_ncols() - if (nrow /= ncol) then - write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol - info=-1 - call psb_errpush(info,name) - goto 9999 - endif - nnzero = a_glob%get_nzeros() - nrhs = 1 - endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - - ! broadcast informations to other processors - call psb_bcast(ictxt,nrow, root) - call psb_bcast(ictxt,ncol, root) - call psb_bcast(ictxt,nnzero, root) - call psb_bcast(ictxt,nrhs, root) - liwork = max(np, nrow + ncol) - allocate(iwork(liwork), stat = info) - if (info /= psb_success_) then - info=psb_err_alloc_request_ - int_err(1)=liwork - call psb_errpush(info,name,i_err=int_err,a_err='integer') - goto 9999 - endif - if (iam == root) then - write (*, fmt = *) 'start matdist',root, size(iwork),& - &nrow, ncol, nnzero,nrhs, use_parts, use_v - endif - if (use_parts) then - call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) - else - info = -1 - end if - - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_cdall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psspall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) - allocate(val(isize),irow(isize),icol(isize),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - i_count = 1 - - do while (i_count <= nrow) - - if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count - iproc = iwork(1) - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit - if (iwork(1) /= iproc ) exit - end do - end if - else - length_row = 1 - j_count = i_count - iproc = v(i_count) - - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit - end do - end if - - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do -!!$ write(0,*) 'mat_dist: sending rows ',i_count,j_count-1,' to proc',iproc, ll -!!$ write(0,*) 'mat_dist: sending ',irow(ll),icol(ll),val(ll ) - if (iproc == iam) then - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_spins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,nnr,iproc) - call psb_snd(ictxt,ll,iproc) - call psb_snd(ictxt,irow(1:ll),iproc) - call psb_snd(ictxt,icol(1:ll),iproc) - call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) - call psb_rcv(ictxt,ll,iproc) - endif - else if (iam /= root) then - - if (iproc == iam) then - call psb_rcv(ictxt,nnr,root) - call psb_rcv(ictxt,ll,root) - if (ll > size(irow)) then - write(psb_err_unit,*) iam,'need to reallocate ',ll - deallocate(val,irow,icol) - allocate(val(ll),irow(ll),icol(ll),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - endif - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - end do - i_count = i_count + 1 - endif - end do - - - call psb_barrier(ictxt) - t0 = psb_wtime() - call psb_cdasb(desc_a,info) - t1 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_cdasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - call psb_barrier(ictxt) - t2 = psb_wtime() - call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) - t3 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_spasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - if (iam == root) then - write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 - write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 - end if - - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - deallocate(val,irow,icol,stat=info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='deallocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - deallocate(iwork) - if (iam == root) write (*, fmt = *) 'end matdist' - - call psb_erractionrestore(err_act) - return - -9999 continue - call psb_erractionrestore(err_act) - if (err_act == psb_act_abort_) then - call psb_error(ictxt) - return - end if - return - -end subroutine dmatdist - - -subroutine cmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - use psb_base_mod - use psb_mat_mod - implicit none - - ! parameters - type(psb_cspmat_type) :: a_glob - complex(psb_spk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_cspmat_type) :: a - complex(psb_spk_), allocatable :: b(:) - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt - class(psb_c_base_sparse_mat), optional :: mold - - integer(psb_ipk_) :: v(:) - interface - subroutine parts(global_indx,n,np,pv,nv) - implicit none - integer(psb_ipk_), intent(in) :: global_indx, n, np - integer(psb_ipk_), intent(out) :: nv - integer(psb_ipk_), intent(out) :: pv(*) - end subroutine parts - end interface - optional :: parts, v - - ! local variables - logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam - integer(psb_ipk_) :: length_row, i_count, j_count,& - & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& - & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - complex(psb_spk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 - real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 - character(len=20) :: name, ch_err - - info = psb_success_ - err = 0 - name = 'mat_distf' - call psb_erractionsave(err_act) - - ! executable statements - if (present(inroot)) then - root = inroot - else - root = psb_root_ - end if - call psb_info(ictxt, iam, np) - if (iam == root) then - nrow = a_glob%get_nrows() - ncol = a_glob%get_ncols() - if (nrow /= ncol) then - write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol - info=-1 - call psb_errpush(info,name) - goto 9999 - endif - nnzero = a_glob%get_nzeros() - nrhs = 1 - endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - - ! broadcast informations to other processors - call psb_bcast(ictxt,nrow, root) - call psb_bcast(ictxt,ncol, root) - call psb_bcast(ictxt,nnzero, root) - call psb_bcast(ictxt,nrhs, root) - liwork = max(np, nrow + ncol) - allocate(iwork(liwork), stat = info) - if (info /= psb_success_) then - info=psb_err_alloc_request_ - int_err(1)=liwork - call psb_errpush(info,name,i_err=int_err,a_err='integer') - goto 9999 - endif - if (iam == root) then - write (*, fmt = *) 'start matdist',root, size(iwork),& - &nrow, ncol, nnzero,nrhs - endif - if (use_parts) then - call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) - else - info = -1 - end if - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_cdall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psspall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) - allocate(val(isize),irow(isize),icol(isize),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - i_count = 1 - - do while (i_count <= nrow) - - if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count - iproc = iwork(1) - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit - if (iwork(1) /= iproc ) exit - end do - end if - else - length_row = 1 - j_count = i_count - iproc = v(i_count) - - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit - end do - end if - - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (iproc == iam) then - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_spins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,nnr,iproc) - call psb_snd(ictxt,ll,iproc) - call psb_snd(ictxt,irow(1:ll),iproc) - call psb_snd(ictxt,icol(1:ll),iproc) - call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) - call psb_rcv(ictxt,ll,iproc) - endif - else if (iam /= root) then - - if (iproc == iam) then - call psb_rcv(ictxt,nnr,root) - call psb_rcv(ictxt,ll,root) - if (ll > size(irow)) then - write(psb_err_unit,*) iam,'need to reallocate ',ll - deallocate(val,irow,icol) - allocate(val(ll),irow(ll),icol(ll),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - endif - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - end do - i_count = i_count + 1 - endif - end do - - call psb_barrier(ictxt) - t0 = psb_wtime() - call psb_cdasb(desc_a,info) - t1 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_cdasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - call psb_barrier(ictxt) - t2 = psb_wtime() - call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) - t3 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_spasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - if (iam == root) then - write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 - write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 - end if - - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - deallocate(val,irow,icol,stat=info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='deallocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - deallocate(iwork) - if (iam == root) write (*, fmt = *) 'end matdist' - - call psb_erractionrestore(err_act) - return - -9999 continue - call psb_erractionrestore(err_act) - if (err_act == psb_act_abort_) then - call psb_error(ictxt) - return - end if - return - -end subroutine cmatdist - - -subroutine zmatdist(a_glob, a, ictxt, desc_a,& - & b_glob, b, info, parts, v, inroot,fmt,mold) - ! - ! an utility subroutine to distribute a matrix among processors - ! according to a user defined data distribution, using - ! sparse matrix subroutines. - ! - ! type(d_spmat) :: a_glob - ! on entry: this contains the global sparse matrix as follows: - ! a%fida == 'csr' - ! a%aspk for coefficient values - ! a%ia1 for column indices - ! a%ia2 for row pointers - ! a%m for number of global matrix rows - ! a%k for number of global matrix columns - ! on exit : undefined, with unassociated pointers. - ! - ! type(d_spmat) :: a - ! on entry: fresh variable. - ! on exit : this will contain the local sparse matrix. - ! - ! interface parts - ! ! .....user passed subroutine..... - ! subroutine parts(global_indx,n,np,pv,nv) - ! implicit none - ! integer(psb_ipk_), intent(in) :: global_indx, n, np - ! integer(psb_ipk_), intent(out) :: nv - ! integer(psb_ipk_), intent(out) :: pv(*) - ! - ! end subroutine parts - ! end interface - ! on entry: subroutine providing user defined data distribution. - ! for each global_indx the subroutine should return - ! the list pv of all processes owning the row with - ! that index; the list will contain nv entries. - ! usually nv=1; if nv >1 then we have an overlap in the data - ! distribution. - ! - ! integer(psb_ipk_) :: ictxt - ! on entry: blacs context. - ! on exit : unchanged. - ! - ! type (desc_type) :: desc_a - ! on entry: fresh variable. - ! on exit : the updated array descriptor - ! - ! real(psb_dpk_), optional :: b_glob(:) - ! on entry: this contains right hand side. - ! on exit : - ! - ! real(psb_dpk_), allocatable, optional :: b(:) - ! on entry: fresh variable. - ! on exit : this will contain the local right hand side. - ! - ! integer(psb_ipk_), optional :: inroot - ! on entry: specifies processor holding a_glob. default: 0 - ! on exit : unchanged. - ! - use psb_base_mod - use psb_mat_mod - implicit none - - ! parameters - type(psb_zspmat_type) :: a_glob - complex(psb_dpk_) :: b_glob(:) - integer(psb_ipk_) :: ictxt - type(psb_zspmat_type) :: a - complex(psb_dpk_), allocatable :: b(:) - type(psb_desc_type) :: desc_a - integer(psb_ipk_), intent(out) :: info - integer(psb_ipk_), optional :: inroot - character(len=5), optional :: fmt - class(psb_z_base_sparse_mat), optional :: mold - - integer(psb_ipk_) :: v(:) - interface - subroutine parts(global_indx,n,np,pv,nv) - implicit none - integer(psb_ipk_), intent(in) :: global_indx, n, np - integer(psb_ipk_), intent(out) :: nv - integer(psb_ipk_), intent(out) :: pv(*) - end subroutine parts - end interface - optional :: parts, v - - ! local variables - logical :: use_parts, use_v - integer(psb_ipk_) :: np, iam - integer(psb_ipk_) :: length_row, i_count, j_count,& - & k_count, root, liwork, nrow, ncol, nnzero, nrhs,& - & i, ll, nz, isize, iproc, nnr, err, err_act, int_err(5) - integer(psb_ipk_), allocatable :: iwork(:) - integer(psb_ipk_), allocatable :: irow(:),icol(:) - complex(psb_dpk_), allocatable :: val(:) - integer(psb_ipk_), parameter :: nb=30 - real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 - character(len=20) :: name, ch_err - - info = psb_success_ - err = 0 - name = 'mat_distf' - call psb_erractionsave(err_act) - - ! executable statements - if (present(inroot)) then - root = inroot - else - root = psb_root_ - end if - call psb_info(ictxt, iam, np) - if (iam == root) then - nrow = a_glob%get_nrows() - ncol = a_glob%get_ncols() - if (nrow /= ncol) then - write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol - info=-1 - call psb_errpush(info,name) - goto 9999 - endif - nnzero = a_glob%get_nzeros() - nrhs = 1 - endif - - use_parts = present(parts) - use_v = present(v) - if (count((/ use_parts, use_v /)) /= 1) then - info=psb_err_no_optional_arg_ - call psb_errpush(info,name,a_err=" v, parts") - goto 9999 - endif - - ! broadcast informations to other processors - call psb_bcast(ictxt,nrow, root) - call psb_bcast(ictxt,ncol, root) - call psb_bcast(ictxt,nnzero, root) - call psb_bcast(ictxt,nrhs, root) - liwork = max(np, nrow + ncol) - allocate(iwork(liwork), stat = info) - if (info /= psb_success_) then - info=psb_err_alloc_request_ - int_err(1)=liwork - call psb_errpush(info,name,i_err=int_err,a_err='integer') - goto 9999 - endif - if (iam == root) then - write (*, fmt = *) 'start matdist',root, size(iwork),& - &nrow, ncol, nnzero,nrhs - endif - if (use_parts) then - call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) - else if (use_v) then - call psb_cdall(ictxt,desc_a,info,vg=v) - else - info = -1 - end if - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_cdall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_spall(a,desc_a,info,nnz=((nnzero+np-1)/np)) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psspall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geall(b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_psdsall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - - isize = 3*nb*max(((nnzero+nrow)/nrow),nb) - allocate(val(isize),irow(isize),icol(isize),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - i_count = 1 - - do while (i_count <= nrow) - - if (use_parts) then - call parts(i_count,nrow,np,iwork, length_row) - if (length_row == 1) then - j_count = i_count - iproc = iwork(1) - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - call parts(j_count,nrow,np,iwork, length_row) - if (length_row /= 1 ) exit - if (iwork(1) /= iproc ) exit - end do - end if - else - length_row = 1 - j_count = i_count - iproc = v(i_count) - - do - j_count = j_count + 1 - if (j_count-i_count >= nb) exit - if (j_count > nrow) exit - if (v(j_count) /= iproc ) exit - end do - end if - - if (length_row == 1) then - ! now we should insert rows i_count..j_count-1 - nnr = j_count - i_count - - if (iam == root) then - - ll = 0 - do i= i_count, j_count-1 - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (iproc == iam) then - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_spins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,j_count-1)/),b_glob(i_count:j_count-1),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psb_ins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,nnr,iproc) - call psb_snd(ictxt,ll,iproc) - call psb_snd(ictxt,irow(1:ll),iproc) - call psb_snd(ictxt,icol(1:ll),iproc) - call psb_snd(ictxt,val(1:ll),iproc) - call psb_snd(ictxt,b_glob(i_count:j_count-1),iproc) - call psb_rcv(ictxt,ll,iproc) - endif - else if (iam /= root) then - - if (iproc == iam) then - call psb_rcv(ictxt,nnr,root) - call psb_rcv(ictxt,ll,root) - if (ll > size(irow)) then - write(psb_err_unit,*) iam,'need to reallocate ',ll - deallocate(val,irow,icol) - allocate(val(ll),irow(ll),icol(ll),stat=info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='Allocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - endif - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count:i_count+nnr-1),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(nnr,(/(i,i=i_count,i_count+nnr-1)/),& - & b_glob(i_count:i_count+nnr-1),b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - - i_count = j_count - - else - - ! here processors are counted 1..np - do j_count = 1, length_row - k_count = iwork(j_count) - if (iam == root) then - - ll = 0 - do i= i_count, i_count - call a_glob%csget(i,i,nz,& - & irow,icol,val,info,nzin=ll,append=.true.) - if (info /= psb_success_) then - if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then - write(psb_err_unit,*) 'Allocation failure? This should not happen!' - end if - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - ll = ll + nz - end do - - if (k_count == iam) then - - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - else - call psb_snd(ictxt,ll,k_count) - call psb_snd(ictxt,irow(1:ll),k_count) - call psb_snd(ictxt,icol(1:ll),k_count) - call psb_snd(ictxt,val(1:ll),k_count) - call psb_snd(ictxt,b_glob(i_count),k_count) - call psb_rcv(ictxt,ll,k_count) - endif - else if (iam /= root) then - if (k_count == iam) then - call psb_rcv(ictxt,ll,root) - call psb_rcv(ictxt,irow(1:ll),root) - call psb_rcv(ictxt,icol(1:ll),root) - call psb_rcv(ictxt,val(1:ll),root) - call psb_rcv(ictxt,b_glob(i_count),root) - call psb_snd(ictxt,ll,root) - call psb_spins(ll,irow,icol,val,a,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psspins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - call psb_geins(1,(/i_count/),b_glob(i_count:i_count),& - & b,desc_a,info) - if(info /= psb_success_) then - info=psb_err_from_subroutine_ - ch_err='psdsins' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - endif - endif - end do - i_count = i_count + 1 - endif - end do - - call psb_barrier(ictxt) - t0 = psb_wtime() - call psb_cdasb(desc_a,info) - t1 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_cdasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - call psb_barrier(ictxt) - t2 = psb_wtime() - call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) - t3 = psb_wtime() - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psb_spasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - - if (iam == root) then - write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 - write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 - end if - - call psb_geasb(b,desc_a,info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='psdsasb' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - deallocate(val,irow,icol,stat=info) - if(info /= psb_success_)then - info=psb_err_from_subroutine_ - ch_err='deallocate' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - - deallocate(iwork) - if (iam == root) write (*, fmt = *) 'end matdist' - - call psb_erractionrestore(err_act) - return - -9999 continue - call psb_erractionrestore(err_act) - if (err_act == psb_act_abort_) then - call psb_error(ictxt) - return - end if - return - -end subroutine zmatdist -