diff --git a/Makefile b/Makefile index 829d4fc1..41caa3cd 100644 --- a/Makefile +++ b/Makefile @@ -15,15 +15,15 @@ libd: (if test ! -d include ; then mkdir include; fi; $(INSTALL_DATA) Make.inc include/Make.inc.psblas) (if test ! -d modules ; then mkdir modules; fi;) based: - cd base && $(MAKE) lib + $(MAKE) -C base lib precd: - cd prec && $(MAKE) lib + $(MAKE) -C prec lib kryld: - cd krylov && $(MAKE) lib + $(MAKE) -C krylov lib utild: - cd util&& $(MAKE) lib + $(MAKE) -C util lib cbindd: - cd cbind&& $(MAKE) lib + $(MAKE) -C cbind lib install: all mkdir -p $(INSTALL_INCLUDEDIR) &&\ @@ -42,11 +42,11 @@ install: all /bin/cp -fr test/pargen test/fileread test/kernel $(INSTALL_SAMPLESDIR) && \ mkdir -p $(INSTALL_SAMPLESDIR)/cbind && /bin/cp -fr cbind/test/pargen/* $(INSTALL_SAMPLESDIR)/cbind clean: - cd base && $(MAKE) clean - cd prec && $(MAKE) clean - cd krylov && $(MAKE) clean - cd util && $(MAKE) clean - cd cbind && $(MAKE) clean + $(MAKE) -C base clean + $(MAKE) -C prec clean + $(MAKE) -C krylov clean + $(MAKE) -C util clean + $(MAKE) -C cbind clean check: all make check -C test/serial diff --git a/base/Makefile b/base/Makefile index 3304176c..039a5963 100644 --- a/base/Makefile +++ b/base/Makefile @@ -13,25 +13,25 @@ lib: mods sr cm in pb tl sr cm in pb tl: mods mods: - cd modules && $(MAKE) lib LIBNAME=$(BASELIBNAME) F90="$(MPF90)" F90COPT="$(F90COPT) $(MPI_OPT)" + $(MAKE) -C modules lib LIBNAME=$(BASELIBNAME) F90="$(MPF90)" F90COPT="$(F90COPT) $(MPI_OPT)" sr: - cd serial && $(MAKE) lib LIBNAME=$(BASELIBNAME) + $(MAKE) -C serial lib LIBNAME=$(BASELIBNAME) cm: - cd comm && $(MAKE) lib LIBNAME=$(BASELIBNAME) + $(MAKE) -C comm lib LIBNAME=$(BASELIBNAME) in: - cd internals && $(MAKE) lib LIBNAME=$(BASELIBNAME) + $(MAKE) -C internals lib LIBNAME=$(BASELIBNAME) pb: - cd psblas && $(MAKE) lib LIBNAME=$(BASELIBNAME) + $(MAKE) -C psblas lib LIBNAME=$(BASELIBNAME) tl: - cd tools && $(MAKE) lib LIBNAME=$(BASELIBNAME) + $(MAKE) -C tools lib LIBNAME=$(BASELIBNAME) clean: - (cd modules; $(MAKE) clean) - (cd comm; $(MAKE) clean) - (cd internals; $(MAKE) clean) - (cd tools; $(MAKE) clean) - (cd serial; $(MAKE) clean) - (cd psblas; $(MAKE) clean) + ($(MAKE) -C modules clean) + ($(MAKE) -C comm clean) + ($(MAKE) -C internals clean) + ($(MAKE) -C tools clean) + ($(MAKE) -C serial clean) + ($(MAKE) -C psblas clean) veryclean: clean /bin/rm -f $(HERE)/$(LIBNAME) $(LIBMOD) *$(.mod) diff --git a/base/comm/Makefile b/base/comm/Makefile index f27d38cf..950a95a0 100644 --- a/base/comm/Makefile +++ b/base/comm/Makefile @@ -32,7 +32,7 @@ lib: interns mpfobjs $(OBJS) $(RANLIB) $(LIBDIR)/$(LIBNAME) interns: - cd internals && $(MAKE) lib + $(MAKE) -C internals lib mpfobjs: $(MAKE) $(MPFOBJS) FC="$(MPFC)" diff --git a/base/internals/psi_graph_fnd_owner.F90 b/base/internals/psi_graph_fnd_owner.F90 index 2ecdb24c..b951945e 100644 --- a/base/internals/psi_graph_fnd_owner.F90 +++ b/base/internals/psi_graph_fnd_owner.F90 @@ -162,7 +162,7 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info) ! call psb_safe_ab_cpy(idxmap%p_adjcncy,ladj,info) nadj = psb_size(ladj) - ! This makes ladj allocated with size 0 just in case + ! This makes ladj allocated with size 0 if needed, as opposed to unallocated call psb_realloc(nadj,ladj,info) ! ! Throughout the subroutine, nreqst is the number of local inquiries diff --git a/base/internals/psi_indx_map_fnd_owner.F90 b/base/internals/psi_indx_map_fnd_owner.F90 index 0aee0806..25f900f0 100644 --- a/base/internals/psi_indx_map_fnd_owner.F90 +++ b/base/internals/psi_indx_map_fnd_owner.F90 @@ -74,7 +74,8 @@ subroutine psi_indx_map_fnd_owner(idx,iprc,idxmap,info) integer(psb_ipk_), allocatable :: hhidx(:) integer(psb_mpk_) :: icomm, minfo, iictxt - integer(psb_ipk_) :: i, err_act, hsize, nv + integer(psb_ipk_) :: i, err_act, hsize + integer(psb_lpk_) :: nv integer(psb_lpk_) :: mglob integer(psb_ipk_) :: ictxt,np,me, nresp logical, parameter :: gettime=.false. @@ -140,7 +141,66 @@ subroutine psi_indx_map_fnd_owner(idx,iprc,idxmap,info) else - call psi_graph_fnd_owner(idx,iprc,idxmap,info) + if (allocated(idxmap%halo_owner)) then + ! + ! Maybe we are coming here after a REINIT event. + ! In this case, reuse the existing information as much as possible. + ! + block + integer(psb_ipk_), allocatable :: tprc(:), lidx(:) + integer(psb_lpk_), allocatable :: tidx(:) + integer(psb_lpk_) :: k1, k2, nh + allocate(lidx(nv),stat=info) + if (info /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') + goto 9999 + end if + ! + ! Get local answers, if any + ! + call idxmap%g2l(idx,lidx,info,owned=.false.) + call idxmap%qry_halo_owner(lidx,iprc,info) + + nh = count(iprc<0) + !write(0,*) me,'Going through new impl from ',nv,' to ',nh + allocate(tidx(nh),tprc(nh),stat=info) + if (info /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='Allocate') + goto 9999 + end if + ! + ! Prepare remote queries + ! + k2 = 0 + do k1 = 1, nv + if (iprc(k1) < 0) then + k2 = k2 + 1 + if (k2 > nh) then + info = psb_err_internal_error_ + call psb_errpush(info,name,a_err='Wrong auxiliary count') + goto 9999 + end if + tidx(k2) = idx(k1) + end if + end do + call psi_graph_fnd_owner(tidx,tprc,idxmap,info) + k2 = 0 + do k1 = 1, nv + if (iprc(k1) < 0) then + k2 = k2 + 1 + if (k2 > nh) then + info = psb_err_internal_error_ + call psb_errpush(info,name,a_err='Wrong auxiliary count') + goto 9999 + end if + iprc(k1) = tprc(k2) + end if + end do + end block + else + call psi_graph_fnd_owner(idx,iprc,idxmap,info) + end if + end if diff --git a/base/modules/desc/psb_hash_map_mod.f90 b/base/modules/desc/psb_hash_map_mod.f90 index 2f4e8653..ae109d5a 100644 --- a/base/modules/desc/psb_hash_map_mod.f90 +++ b/base/modules/desc/psb_hash_map_mod.f90 @@ -94,8 +94,8 @@ module psb_hash_map_mod procedure, pass(idxmap) :: lg2lv1_ins => hash_g2lv1_ins procedure, pass(idxmap) :: lg2lv2_ins => hash_g2lv2_ins - procedure, pass(idxmap) :: hash_cpy - generic, public :: assignment(=) => hash_cpy +!!$ procedure, pass(idxmap) :: hash_cpy +!!$ generic, public :: assignment(=) => hash_cpy procedure, pass(idxmap) :: bld_g2l_map => hash_bld_g2l_map end type psb_hash_map @@ -1464,7 +1464,7 @@ contains end if select type (outmap) - type is (psb_hash_map) + type is (psb_hash_map) call idxmap%psb_indx_map%cpy(outmap%psb_indx_map,info) if (info == psb_success_) then outmap%hashvsize = idxmap%hashvsize @@ -1478,7 +1478,7 @@ contains & call psb_safe_ab_cpy(idxmap%glb_lc,outmap%glb_lc,info) if (info == psb_success_)& & call psb_hash_copy(idxmap%hash,outmap%hash,info) -!!$ outmap = idxmap + class default ! This should be impossible info = -1 @@ -1499,28 +1499,30 @@ contains end subroutine hash_clone - subroutine hash_cpy(outmap,idxmap) - use psb_penv_mod - use psb_error_mod - use psb_realloc_mod - implicit none - class(psb_hash_map), intent(in) :: idxmap - type(psb_hash_map), intent(out) :: outmap - integer(psb_ipk_) :: info - - info = psb_success_ - outmap%psb_indx_map = idxmap%psb_indx_map - outmap%hashvsize = idxmap%hashvsize - outmap%hashvmask = idxmap%hashvmask - if (info == psb_success_)& - & call psb_safe_ab_cpy(idxmap%loc_to_glob,outmap%loc_to_glob,info) - if (info == psb_success_)& - & call psb_safe_ab_cpy(idxmap%hashv,outmap%hashv,info) - if (info == psb_success_)& - & call psb_safe_ab_cpy(idxmap%glb_lc,outmap%glb_lc,info) - if (info == psb_success_)& - & call psb_hash_copy(idxmap%hash,outmap%hash,info) - end subroutine hash_cpy +!!$ subroutine hash_cpy(outmap,idxmap) +!!$ use psb_penv_mod +!!$ use psb_error_mod +!!$ use psb_realloc_mod +!!$ implicit none +!!$ class(psb_hash_map), intent(in) :: idxmap +!!$ type(psb_hash_map), intent(out) :: outmap +!!$ integer(psb_ipk_) :: info +!!$ +!!$ info = psb_success_ +!!$ call idxmap%psb_indx_map%cpy(outmap%psb_indx_map,info) +!!$ if (info == psb_success_) then +!!$ outmap%hashvsize = idxmap%hashvsize +!!$ outmap%hashvmask = idxmap%hashvmask +!!$ end if +!!$ if (info == psb_success_)& +!!$ & call psb_safe_ab_cpy(idxmap%loc_to_glob,outmap%loc_to_glob,info) +!!$ if (info == psb_success_)& +!!$ & call psb_safe_ab_cpy(idxmap%hashv,outmap%hashv,info) +!!$ if (info == psb_success_)& +!!$ & call psb_safe_ab_cpy(idxmap%glb_lc,outmap%glb_lc,info) +!!$ if (info == psb_success_)& +!!$ & call psb_hash_copy(idxmap%hash,outmap%hash,info) +!!$ end subroutine hash_cpy subroutine hash_reinit(idxmap,info) use psb_penv_mod @@ -1533,7 +1535,7 @@ contains integer(psb_lpk_) :: lk integer(psb_lpk_) :: ntot integer(psb_ipk_) :: ictxt, me, np - integer(psb_ipk_), allocatable :: lidx(:) + integer(psb_ipk_), allocatable :: lidx(:), tadj(:), th_own(:) integer(psb_lpk_), allocatable :: gidx(:) character(len=20) :: name='hash_reinit' logical, parameter :: debug=.false. @@ -1549,13 +1551,16 @@ contains lidx = (/(k,k=1,nc)/) gidx = (/(lk,lk=1,nc)/) call idxmap%l2gip(gidx,info) - + tadj = idxmap%get_p_adjcncy() + call idxmap%get_halo_owner(th_own,info) + call idxmap%free() call hash_init_vlu(idxmap,ictxt,ntot,nr,gidx(1:nr),info) if (nc>nr) then call idxmap%g2lip_ins(gidx(nr+1:nc),info,lidx=lidx(nr+1:nc)) end if - + call idxmap%set_p_adjcncy(tadj) + call idxmap%set_halo_owner(th_own,info) if (info /= psb_success_) then info = psb_err_from_subroutine_ diff --git a/base/modules/desc/psb_indx_map_mod.f90 b/base/modules/desc/psb_indx_map_mod.f90 index f5aeaa4d..dd72280b 100644 --- a/base/modules/desc/psb_indx_map_mod.f90 +++ b/base/modules/desc/psb_indx_map_mod.f90 @@ -219,9 +219,9 @@ module psb_indx_map_mod procedure, pass(idxmap) :: set_halo_owner => base_set_halo_owner procedure, pass(idxmap) :: get_halo_owner => base_get_halo_owner - procedure, pass(idxmap) :: fnd_halo_owner_s => base_fnd_halo_owner_s - procedure, pass(idxmap) :: fnd_halo_owner_v => base_fnd_halo_owner_v - generic, public :: fnd_halo_owner => fnd_halo_owner_s, fnd_halo_owner_v + procedure, pass(idxmap) :: qry_halo_owner_s => base_qry_halo_owner_s + procedure, pass(idxmap) :: qry_halo_owner_v => base_qry_halo_owner_v + generic, public :: qry_halo_owner => qry_halo_owner_s, qry_halo_owner_v procedure, pass(idxmap) :: fnd_owner => psi_indx_map_fnd_owner procedure, pass(idxmap) :: init_vl => base_init_vl @@ -245,7 +245,7 @@ module psb_indx_map_mod & base_lg2lv2_ins, base_init_vl, base_is_null,& & base_row_extendable, base_clone, base_cpy, base_reinit, & & base_set_halo_owner, base_get_halo_owner, & - & base_fnd_halo_owner_s, base_fnd_halo_owner_v,& + & base_qry_halo_owner_s, base_qry_halo_owner_v,& & base_get_p_adjcncy, base_set_p_adjcncy, base_xtnd_p_adjcncy !> Function: psi_indx_map_fnd_owner @@ -1486,6 +1486,7 @@ contains end subroutine base_set_halo_owner subroutine base_get_halo_owner(idxmap,v,info) + use psb_realloc_mod use psb_penv_mod use psb_error_mod implicit none @@ -1494,13 +1495,15 @@ contains integer(psb_ipk_), intent(out) :: info integer(psb_ipk_) :: nh - nh = min(size(v),size(idxmap%halo_owner)) - v(1:nh) = idxmap%halo_owner(1:nh) + nh = size(idxmap%halo_owner) + !v = idxmap%halo_owner(1:nh) + call psb_safe_ab_cpy(idxmap%halo_owner,v,info) end subroutine base_get_halo_owner - subroutine base_fnd_halo_owner_s(idxmap,xin,xout,info) + subroutine base_qry_halo_owner_s(idxmap,xin,xout,info) use psb_penv_mod use psb_error_mod + use psb_realloc_mod implicit none class(psb_indx_map), intent(inout) :: idxmap integer(psb_ipk_), intent(in) :: xin @@ -1510,24 +1513,26 @@ contains integer(psb_ipk_) :: i, j, nr, nc, nh nr = idxmap%local_rows nc = idxmap%local_cols + nc = min(idxmap%local_cols, (nr+psb_size(idxmap%halo_owner))) xout = -1 if (.not.allocated(idxmap%halo_owner)) then !write(0,*) 'Halo_owner not allocated!', nr, nc, xin return end if if ((nr hlstart) then - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) sdsz(proc+1) = sdsz(proc+1) +1 end if end do @@ -264,19 +263,19 @@ subroutine psb_lc_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) acoo%ja(nzd) = acoo%ja(k) acoo%val(nzd) = acoo%val(k) else - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) tsdx(proc+1) = tsdx(proc+1) +1 iasnd(tsdx(proc+1)) = acoo%ia(k) jasnd(tsdx(proc+1)) = acoo%ja(k) valsnd(tsdx(proc+1)) = acoo%val(k) end if end do + call acoo%set_nzeros(nzd) ! ! Put halo entries in global numbering ! call desc_r%indxmap%l2gip(jasnd(1:iszs),info) call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) - call acoo%set_nzeros(nzd) ! And exchange data. ! Normally we'll use our SIMPLE A2AV and not MPI, because ! the communication pattern is sparse, so ours is more @@ -407,19 +406,293 @@ subroutine psb_c_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) type(psb_desc_type), intent(out), optional :: desc_rx integer(psb_ipk_), intent(out) :: info - type(psb_lc_coo_sparse_mat) :: atcoo + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc, err_act, j + integer(psb_ipk_) :: i, k, idx, r, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & l1, nsnds, nrcvs, nr,nc,nzl, hlstart, nzd + integer(psb_lpk_) :: nzt, lszr + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:), sdsz(:), tsdx(:), trvx(:) + integer(psb_ipk_), allocatable :: halo_owner(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:),iarcv(:),jarcv(:) + complex(psb_spk_), allocatable :: valsnd(:) + type(psb_c_coo_sparse_mat), allocatable :: acoo + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + type(psb_desc_type), pointer :: p_desc_c + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err - if (present(atrans)) then - call ain%cp_to_lcoo(atcoo,info) + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='mld_glob_transpose' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_r%get_context() + icomm = desc_r%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(desc_c)) then + p_desc_c => desc_c + else + p_desc_c => desc_r + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + if (present(atrans)) then + call ain%cp_to_coo(acoo,info) + else + call ain%mv_to_coo(acoo,info) + end if + + + ! + ! Compute number of entries in the + ! halo part, sorted by destination process + ! + nr = desc_r%get_local_rows() + nc = p_desc_c%get_local_cols() + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + do k=1, nzl + j = acoo%ja(k) + if (j > hlstart) then + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + sdsz(proc+1) = sdsz(proc+1) +1 + end if + end do + + ! + ! Exchange sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! number of entries was not precomputed in the descriptor, + ! which was vector-oriented and not matrix-entry-oriented + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do proc = 0, np-1 + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + Enddo + + tsdx = bsdindx + trvx = brvindx + + iszr = sum(rvsz) + iszs = sum(sdsz) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),iarcv,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),jarcv,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + ! + ! Now, transpose the matrix, then split between itself + ! and the send buffers + ! + call acoo%transp() + if (acoo%get_nzeros()/= nzl) then + write(0,*) me,'Something strange upon transpose: ',nzl,acoo%get_nzeros() + end if + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + + + nzd = 0 + do k = 1, nzl + j = acoo%ia(k) + if (j<=hlstart) then + nzd = nzd + 1 + acoo%ia(nzd) = acoo%ia(k) + acoo%ja(nzd) = acoo%ja(k) + acoo%val(nzd) = acoo%val(k) + else + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + tsdx(proc+1) = tsdx(proc+1) +1 + iasnd(tsdx(proc+1)) = acoo%ia(k) + jasnd(tsdx(proc+1)) = acoo%ja(k) + valsnd(tsdx(proc+1)) = acoo%val(k) + end if + end do + call acoo%set_nzeros(nzd) + ! + ! Put halo entries in global numbering + ! + call desc_r%indxmap%l2gip(jasnd(1:iszs),info) + call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) + ! And exchange data. + ! Normally we'll use our SIMPLE A2AV and not MPI, because + ! the communication pattern is sparse, so ours is more + ! efficient. Using ACOO for the receive buffers. + nzl = acoo%get_nzeros() + call acoo%ensure_size(nzl+iszr) + + select case(psb_get_sp_a2av_alg()) + case(psb_sp_a2av_smpl_triad_) + call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),iarcv(1:iszr),& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_smpl_v_) + call psb_simple_a2av(valsnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& + & iarcv(1:iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_mpi_) + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_spk_,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,psb_mpi_c_spk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & iarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & jarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo + case default + info = psb_err_internal_error_ + call psb_errpush(info,name,a_err='wrong A2AV alg selector') + goto 9999 + end select + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + + if (present(desc_rx)) then + ! + ! Extend the appropriate descriptor; started as R but on + ! transpose it now describes C + ! + call desc_r%clone(desc_rx,info) + call psb_cd_reinit(desc_rx,info) + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + call desc_rx%g2lip_ins(jarcv(1:nzt),info) + call psb_cdasb(desc_rx,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + ! + ! Insert to extend descriptor + ! + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_rx%get_local_cols()) + !write(0,*) me,' Trans RX ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() else - call ain%mv_to_lcoo(atcoo,info) + ! + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + call psb_glob_to_loc(jarcv(1:iszr),desc_r,info,iact='I') + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_r%get_local_cols()) + !write(0,*) me,' Trans R- ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() end if - if (info == 0) call psb_glob_transpose(atcoo,desc_r,info,desc_c=desc_c,desc_rx=desc_rx) + +!!$ write(0,*) me,' Sanity check after rx%g2l :',count(acoo%ja(1:nzl)<0) + + + call acoo%fix(info) + nzl = acoo%get_nzeros() + if (present(atrans)) then - call atrans%mv_from_lcoo(atcoo,info) + call atrans%mv_from_coo(acoo,info) else - call ain%mv_from_lcoo(atcoo,info) + call ain%mv_from_coo(acoo,info) end if + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,& + & iarcv,jarcv,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return end subroutine psb_c_coo_glob_transpose subroutine psb_c_simple_glob_transpose_ip(ain,desc_a,info) @@ -434,7 +707,7 @@ subroutine psb_c_simple_glob_transpose_ip(ain,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_lc_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_c_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz @@ -485,7 +758,7 @@ subroutine psb_c_simple_glob_transpose(ain,aout,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_lc_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_c_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz diff --git a/base/tools/psb_cd_reinit.f90 b/base/tools/psb_cd_reinit.f90 index d579ba95..15dbbc59 100644 --- a/base/tools/psb_cd_reinit.f90 +++ b/base/tools/psb_cd_reinit.f90 @@ -69,6 +69,9 @@ Subroutine psb_cd_reinit(desc,info) call psb_move_alloc(tmp_halo,desc%halo_index,info) call psb_move_alloc(tmp_ext,desc%ext_index,info) call desc%indxmap%reinit(info) +!!$ if (me == 0) write(0,*) 'On cdreinit status :',& +!!$ & allocated(desc%indxmap%p_adjcncy),allocated(desc%indxmap%halo_owner), & +!!$ & desc%get_fmt() ! call psb_cd_set_bld(desc,info) end if diff --git a/base/tools/psb_cspasb.f90 b/base/tools/psb_cspasb.f90 index ae5c0af2..073fcbbd 100644 --- a/base/tools/psb_cspasb.f90 +++ b/base/tools/psb_cspasb.f90 @@ -122,11 +122,11 @@ subroutine psb_cspasb(a,desc_a, info, afmt, upd, dupl, mold) end if - IF (debug_level >= psb_debug_ext_) then + if (debug_level >= psb_debug_ext_) then ch_err=a%get_fmt() write(debug_unit, *) me,' ',trim(name),': From SPCNV',& & info,' ',ch_err - end IF + end if if (psb_errstatus_fatal()) then info=psb_err_from_subroutine_ diff --git a/base/tools/psb_d_glob_transpose.F90 b/base/tools/psb_d_glob_transpose.F90 index 697e7ad2..638ba174 100644 --- a/base/tools/psb_d_glob_transpose.F90 +++ b/base/tools/psb_d_glob_transpose.F90 @@ -113,7 +113,6 @@ subroutine psb_ld_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) integer(psb_ipk_) :: ictxt, np,me integer(psb_ipk_) :: counter,proc, err_act, j integer(psb_lpk_) :: i, k, idx, r, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& - & irmin,icmin,irmax,icmax,& & l1, nsnds, nrcvs, nr,nc,nzl, hlstart, nzt, nzd integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & @@ -190,7 +189,7 @@ subroutine psb_ld_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) do k=1, nzl j = acoo%ja(k) if (j > hlstart) then - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) sdsz(proc+1) = sdsz(proc+1) +1 end if end do @@ -264,19 +263,19 @@ subroutine psb_ld_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) acoo%ja(nzd) = acoo%ja(k) acoo%val(nzd) = acoo%val(k) else - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) tsdx(proc+1) = tsdx(proc+1) +1 iasnd(tsdx(proc+1)) = acoo%ia(k) jasnd(tsdx(proc+1)) = acoo%ja(k) valsnd(tsdx(proc+1)) = acoo%val(k) end if end do + call acoo%set_nzeros(nzd) ! ! Put halo entries in global numbering ! call desc_r%indxmap%l2gip(jasnd(1:iszs),info) call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) - call acoo%set_nzeros(nzd) ! And exchange data. ! Normally we'll use our SIMPLE A2AV and not MPI, because ! the communication pattern is sparse, so ours is more @@ -407,19 +406,293 @@ subroutine psb_d_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) type(psb_desc_type), intent(out), optional :: desc_rx integer(psb_ipk_), intent(out) :: info - type(psb_ld_coo_sparse_mat) :: atcoo + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc, err_act, j + integer(psb_ipk_) :: i, k, idx, r, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & l1, nsnds, nrcvs, nr,nc,nzl, hlstart, nzd + integer(psb_lpk_) :: nzt, lszr + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:), sdsz(:), tsdx(:), trvx(:) + integer(psb_ipk_), allocatable :: halo_owner(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:),iarcv(:),jarcv(:) + real(psb_dpk_), allocatable :: valsnd(:) + type(psb_d_coo_sparse_mat), allocatable :: acoo + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + type(psb_desc_type), pointer :: p_desc_c + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err - if (present(atrans)) then - call ain%cp_to_lcoo(atcoo,info) + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='mld_glob_transpose' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_r%get_context() + icomm = desc_r%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(desc_c)) then + p_desc_c => desc_c + else + p_desc_c => desc_r + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + if (present(atrans)) then + call ain%cp_to_coo(acoo,info) + else + call ain%mv_to_coo(acoo,info) + end if + + + ! + ! Compute number of entries in the + ! halo part, sorted by destination process + ! + nr = desc_r%get_local_rows() + nc = p_desc_c%get_local_cols() + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + do k=1, nzl + j = acoo%ja(k) + if (j > hlstart) then + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + sdsz(proc+1) = sdsz(proc+1) +1 + end if + end do + + ! + ! Exchange sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! number of entries was not precomputed in the descriptor, + ! which was vector-oriented and not matrix-entry-oriented + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do proc = 0, np-1 + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + Enddo + + tsdx = bsdindx + trvx = brvindx + + iszr = sum(rvsz) + iszs = sum(sdsz) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),iarcv,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),jarcv,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + ! + ! Now, transpose the matrix, then split between itself + ! and the send buffers + ! + call acoo%transp() + if (acoo%get_nzeros()/= nzl) then + write(0,*) me,'Something strange upon transpose: ',nzl,acoo%get_nzeros() + end if + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + + + nzd = 0 + do k = 1, nzl + j = acoo%ia(k) + if (j<=hlstart) then + nzd = nzd + 1 + acoo%ia(nzd) = acoo%ia(k) + acoo%ja(nzd) = acoo%ja(k) + acoo%val(nzd) = acoo%val(k) + else + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + tsdx(proc+1) = tsdx(proc+1) +1 + iasnd(tsdx(proc+1)) = acoo%ia(k) + jasnd(tsdx(proc+1)) = acoo%ja(k) + valsnd(tsdx(proc+1)) = acoo%val(k) + end if + end do + call acoo%set_nzeros(nzd) + ! + ! Put halo entries in global numbering + ! + call desc_r%indxmap%l2gip(jasnd(1:iszs),info) + call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) + ! And exchange data. + ! Normally we'll use our SIMPLE A2AV and not MPI, because + ! the communication pattern is sparse, so ours is more + ! efficient. Using ACOO for the receive buffers. + nzl = acoo%get_nzeros() + call acoo%ensure_size(nzl+iszr) + + select case(psb_get_sp_a2av_alg()) + case(psb_sp_a2av_smpl_triad_) + call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),iarcv(1:iszr),& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_smpl_v_) + call psb_simple_a2av(valsnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& + & iarcv(1:iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_mpi_) + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_dpk_,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,psb_mpi_r_dpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & iarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & jarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo + case default + info = psb_err_internal_error_ + call psb_errpush(info,name,a_err='wrong A2AV alg selector') + goto 9999 + end select + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + + if (present(desc_rx)) then + ! + ! Extend the appropriate descriptor; started as R but on + ! transpose it now describes C + ! + call desc_r%clone(desc_rx,info) + call psb_cd_reinit(desc_rx,info) + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + call desc_rx%g2lip_ins(jarcv(1:nzt),info) + call psb_cdasb(desc_rx,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + ! + ! Insert to extend descriptor + ! + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_rx%get_local_cols()) + !write(0,*) me,' Trans RX ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() else - call ain%mv_to_lcoo(atcoo,info) + ! + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + call psb_glob_to_loc(jarcv(1:iszr),desc_r,info,iact='I') + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_r%get_local_cols()) + !write(0,*) me,' Trans R- ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() end if - if (info == 0) call psb_glob_transpose(atcoo,desc_r,info,desc_c=desc_c,desc_rx=desc_rx) + +!!$ write(0,*) me,' Sanity check after rx%g2l :',count(acoo%ja(1:nzl)<0) + + + call acoo%fix(info) + nzl = acoo%get_nzeros() + if (present(atrans)) then - call atrans%mv_from_lcoo(atcoo,info) + call atrans%mv_from_coo(acoo,info) else - call ain%mv_from_lcoo(atcoo,info) + call ain%mv_from_coo(acoo,info) end if + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,& + & iarcv,jarcv,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return end subroutine psb_d_coo_glob_transpose subroutine psb_d_simple_glob_transpose_ip(ain,desc_a,info) @@ -434,7 +707,7 @@ subroutine psb_d_simple_glob_transpose_ip(ain,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_ld_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_d_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz @@ -485,7 +758,7 @@ subroutine psb_d_simple_glob_transpose(ain,aout,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_ld_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_d_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz diff --git a/base/tools/psb_dspasb.f90 b/base/tools/psb_dspasb.f90 index c2434c09..542e6901 100644 --- a/base/tools/psb_dspasb.f90 +++ b/base/tools/psb_dspasb.f90 @@ -122,11 +122,11 @@ subroutine psb_dspasb(a,desc_a, info, afmt, upd, dupl, mold) end if - IF (debug_level >= psb_debug_ext_) then + if (debug_level >= psb_debug_ext_) then ch_err=a%get_fmt() write(debug_unit, *) me,' ',trim(name),': From SPCNV',& & info,' ',ch_err - end IF + end if if (psb_errstatus_fatal()) then info=psb_err_from_subroutine_ diff --git a/base/tools/psb_s_glob_transpose.F90 b/base/tools/psb_s_glob_transpose.F90 index 9b6fd434..875f92db 100644 --- a/base/tools/psb_s_glob_transpose.F90 +++ b/base/tools/psb_s_glob_transpose.F90 @@ -113,7 +113,6 @@ subroutine psb_ls_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) integer(psb_ipk_) :: ictxt, np,me integer(psb_ipk_) :: counter,proc, err_act, j integer(psb_lpk_) :: i, k, idx, r, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& - & irmin,icmin,irmax,icmax,& & l1, nsnds, nrcvs, nr,nc,nzl, hlstart, nzt, nzd integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & @@ -190,7 +189,7 @@ subroutine psb_ls_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) do k=1, nzl j = acoo%ja(k) if (j > hlstart) then - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) sdsz(proc+1) = sdsz(proc+1) +1 end if end do @@ -264,19 +263,19 @@ subroutine psb_ls_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) acoo%ja(nzd) = acoo%ja(k) acoo%val(nzd) = acoo%val(k) else - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) tsdx(proc+1) = tsdx(proc+1) +1 iasnd(tsdx(proc+1)) = acoo%ia(k) jasnd(tsdx(proc+1)) = acoo%ja(k) valsnd(tsdx(proc+1)) = acoo%val(k) end if end do + call acoo%set_nzeros(nzd) ! ! Put halo entries in global numbering ! call desc_r%indxmap%l2gip(jasnd(1:iszs),info) call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) - call acoo%set_nzeros(nzd) ! And exchange data. ! Normally we'll use our SIMPLE A2AV and not MPI, because ! the communication pattern is sparse, so ours is more @@ -407,19 +406,293 @@ subroutine psb_s_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) type(psb_desc_type), intent(out), optional :: desc_rx integer(psb_ipk_), intent(out) :: info - type(psb_ls_coo_sparse_mat) :: atcoo + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc, err_act, j + integer(psb_ipk_) :: i, k, idx, r, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & l1, nsnds, nrcvs, nr,nc,nzl, hlstart, nzd + integer(psb_lpk_) :: nzt, lszr + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:), sdsz(:), tsdx(:), trvx(:) + integer(psb_ipk_), allocatable :: halo_owner(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:),iarcv(:),jarcv(:) + real(psb_spk_), allocatable :: valsnd(:) + type(psb_s_coo_sparse_mat), allocatable :: acoo + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + type(psb_desc_type), pointer :: p_desc_c + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err - if (present(atrans)) then - call ain%cp_to_lcoo(atcoo,info) + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='mld_glob_transpose' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_r%get_context() + icomm = desc_r%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(desc_c)) then + p_desc_c => desc_c + else + p_desc_c => desc_r + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + if (present(atrans)) then + call ain%cp_to_coo(acoo,info) + else + call ain%mv_to_coo(acoo,info) + end if + + + ! + ! Compute number of entries in the + ! halo part, sorted by destination process + ! + nr = desc_r%get_local_rows() + nc = p_desc_c%get_local_cols() + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + do k=1, nzl + j = acoo%ja(k) + if (j > hlstart) then + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + sdsz(proc+1) = sdsz(proc+1) +1 + end if + end do + + ! + ! Exchange sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! number of entries was not precomputed in the descriptor, + ! which was vector-oriented and not matrix-entry-oriented + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do proc = 0, np-1 + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + Enddo + + tsdx = bsdindx + trvx = brvindx + + iszr = sum(rvsz) + iszs = sum(sdsz) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),iarcv,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),jarcv,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + ! + ! Now, transpose the matrix, then split between itself + ! and the send buffers + ! + call acoo%transp() + if (acoo%get_nzeros()/= nzl) then + write(0,*) me,'Something strange upon transpose: ',nzl,acoo%get_nzeros() + end if + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + + + nzd = 0 + do k = 1, nzl + j = acoo%ia(k) + if (j<=hlstart) then + nzd = nzd + 1 + acoo%ia(nzd) = acoo%ia(k) + acoo%ja(nzd) = acoo%ja(k) + acoo%val(nzd) = acoo%val(k) + else + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + tsdx(proc+1) = tsdx(proc+1) +1 + iasnd(tsdx(proc+1)) = acoo%ia(k) + jasnd(tsdx(proc+1)) = acoo%ja(k) + valsnd(tsdx(proc+1)) = acoo%val(k) + end if + end do + call acoo%set_nzeros(nzd) + ! + ! Put halo entries in global numbering + ! + call desc_r%indxmap%l2gip(jasnd(1:iszs),info) + call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) + ! And exchange data. + ! Normally we'll use our SIMPLE A2AV and not MPI, because + ! the communication pattern is sparse, so ours is more + ! efficient. Using ACOO for the receive buffers. + nzl = acoo%get_nzeros() + call acoo%ensure_size(nzl+iszr) + + select case(psb_get_sp_a2av_alg()) + case(psb_sp_a2av_smpl_triad_) + call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),iarcv(1:iszr),& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_smpl_v_) + call psb_simple_a2av(valsnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& + & iarcv(1:iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_mpi_) + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_spk_,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,psb_mpi_r_spk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & iarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & jarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo + case default + info = psb_err_internal_error_ + call psb_errpush(info,name,a_err='wrong A2AV alg selector') + goto 9999 + end select + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + + if (present(desc_rx)) then + ! + ! Extend the appropriate descriptor; started as R but on + ! transpose it now describes C + ! + call desc_r%clone(desc_rx,info) + call psb_cd_reinit(desc_rx,info) + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + call desc_rx%g2lip_ins(jarcv(1:nzt),info) + call psb_cdasb(desc_rx,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + ! + ! Insert to extend descriptor + ! + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_rx%get_local_cols()) + !write(0,*) me,' Trans RX ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() else - call ain%mv_to_lcoo(atcoo,info) + ! + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + call psb_glob_to_loc(jarcv(1:iszr),desc_r,info,iact='I') + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_r%get_local_cols()) + !write(0,*) me,' Trans R- ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() end if - if (info == 0) call psb_glob_transpose(atcoo,desc_r,info,desc_c=desc_c,desc_rx=desc_rx) + +!!$ write(0,*) me,' Sanity check after rx%g2l :',count(acoo%ja(1:nzl)<0) + + + call acoo%fix(info) + nzl = acoo%get_nzeros() + if (present(atrans)) then - call atrans%mv_from_lcoo(atcoo,info) + call atrans%mv_from_coo(acoo,info) else - call ain%mv_from_lcoo(atcoo,info) + call ain%mv_from_coo(acoo,info) end if + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,& + & iarcv,jarcv,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return end subroutine psb_s_coo_glob_transpose subroutine psb_s_simple_glob_transpose_ip(ain,desc_a,info) @@ -434,7 +707,7 @@ subroutine psb_s_simple_glob_transpose_ip(ain,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_ls_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_s_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz @@ -485,7 +758,7 @@ subroutine psb_s_simple_glob_transpose(ain,aout,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_ls_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_s_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz diff --git a/base/tools/psb_sspasb.f90 b/base/tools/psb_sspasb.f90 index 58241a92..01187497 100644 --- a/base/tools/psb_sspasb.f90 +++ b/base/tools/psb_sspasb.f90 @@ -122,11 +122,11 @@ subroutine psb_sspasb(a,desc_a, info, afmt, upd, dupl, mold) end if - IF (debug_level >= psb_debug_ext_) then + if (debug_level >= psb_debug_ext_) then ch_err=a%get_fmt() write(debug_unit, *) me,' ',trim(name),': From SPCNV',& & info,' ',ch_err - end IF + end if if (psb_errstatus_fatal()) then info=psb_err_from_subroutine_ diff --git a/base/tools/psb_z_glob_transpose.F90 b/base/tools/psb_z_glob_transpose.F90 index 1d087bd3..8f7cadd5 100644 --- a/base/tools/psb_z_glob_transpose.F90 +++ b/base/tools/psb_z_glob_transpose.F90 @@ -113,7 +113,6 @@ subroutine psb_lz_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) integer(psb_ipk_) :: ictxt, np,me integer(psb_ipk_) :: counter,proc, err_act, j integer(psb_lpk_) :: i, k, idx, r, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& - & irmin,icmin,irmax,icmax,& & l1, nsnds, nrcvs, nr,nc,nzl, hlstart, nzt, nzd integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & @@ -190,7 +189,7 @@ subroutine psb_lz_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) do k=1, nzl j = acoo%ja(k) if (j > hlstart) then - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) sdsz(proc+1) = sdsz(proc+1) +1 end if end do @@ -264,19 +263,19 @@ subroutine psb_lz_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) acoo%ja(nzd) = acoo%ja(k) acoo%val(nzd) = acoo%val(k) else - call p_desc_c%indxmap%fnd_halo_owner(j,proc,info) + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) tsdx(proc+1) = tsdx(proc+1) +1 iasnd(tsdx(proc+1)) = acoo%ia(k) jasnd(tsdx(proc+1)) = acoo%ja(k) valsnd(tsdx(proc+1)) = acoo%val(k) end if end do + call acoo%set_nzeros(nzd) ! ! Put halo entries in global numbering ! call desc_r%indxmap%l2gip(jasnd(1:iszs),info) call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) - call acoo%set_nzeros(nzd) ! And exchange data. ! Normally we'll use our SIMPLE A2AV and not MPI, because ! the communication pattern is sparse, so ours is more @@ -407,19 +406,293 @@ subroutine psb_z_coo_glob_transpose(ain,desc_r,info,atrans,desc_c,desc_rx) type(psb_desc_type), intent(out), optional :: desc_rx integer(psb_ipk_), intent(out) :: info - type(psb_lz_coo_sparse_mat) :: atcoo + integer(psb_ipk_) :: ictxt, np,me + integer(psb_ipk_) :: counter,proc, err_act, j + integer(psb_ipk_) :: i, k, idx, r, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & l1, nsnds, nrcvs, nr,nc,nzl, hlstart, nzd + integer(psb_lpk_) :: nzt, lszr + integer(psb_mpk_) :: icomm, minfo + integer(psb_mpk_), allocatable :: brvindx(:), & + & rvsz(:), bsdindx(:), sdsz(:), tsdx(:), trvx(:) + integer(psb_ipk_), allocatable :: halo_owner(:) + integer(psb_lpk_), allocatable :: iasnd(:), jasnd(:),iarcv(:),jarcv(:) + complex(psb_dpk_), allocatable :: valsnd(:) + type(psb_z_coo_sparse_mat), allocatable :: acoo + logical :: rowcnv_,colcnv_,rowscale_,colscale_,outcol_glob_ + type(psb_desc_type), pointer :: p_desc_c + character(len=5) :: outfmt_ + integer(psb_ipk_) :: debug_level, debug_unit + character(len=20) :: name, ch_err - if (present(atrans)) then - call ain%cp_to_lcoo(atcoo,info) + if(psb_get_errstatus() /= 0) return + info=psb_success_ + name='mld_glob_transpose' + call psb_erractionsave(err_act) + if (psb_errstatus_fatal()) then + info = psb_err_internal_error_ ; goto 9999 + end if + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = desc_r%get_context() + icomm = desc_r%get_mpic() + + Call psb_info(ictxt, me, np) + + if (debug_level >= psb_debug_outer_) & + & write(debug_unit,*) me,' ',trim(name),': Start' + + if (present(desc_c)) then + p_desc_c => desc_c + else + p_desc_c => desc_r + end if + + Allocate(brvindx(np+1),& + & rvsz(np),sdsz(np),bsdindx(np+1), acoo,stat=info) + + if (info /= psb_success_) then + info=psb_err_alloc_dealloc_ + call psb_errpush(info,name) + goto 9999 + end if + + + sdsz(:)=0 + rvsz(:)=0 + l1 = 0 + brvindx(1) = 0 + bsdindx(1) = 0 + counter=1 + idx = 0 + idxs = 0 + idxr = 0 + + if (present(atrans)) then + call ain%cp_to_coo(acoo,info) + else + call ain%mv_to_coo(acoo,info) + end if + + + ! + ! Compute number of entries in the + ! halo part, sorted by destination process + ! + nr = desc_r%get_local_rows() + nc = p_desc_c%get_local_cols() + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + do k=1, nzl + j = acoo%ja(k) + if (j > hlstart) then + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + sdsz(proc+1) = sdsz(proc+1) +1 + end if + end do + + ! + ! Exchange sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! number of entries was not precomputed in the descriptor, + ! which was vector-oriented and not matrix-entry-oriented + ! + call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& + & rvsz,1,psb_mpi_mpk_,icomm,minfo) + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='mpi_alltoall') + goto 9999 + end if + nsnds = count(sdsz /= 0) + nrcvs = count(rvsz /= 0) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done initial alltoall',nsnds,nrcvs + + idxs = 0 + idxr = 0 + counter = 1 + Do proc = 0, np-1 + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) + Enddo + + tsdx = bsdindx + trvx = brvindx + + iszr = sum(rvsz) + iszs = sum(sdsz) + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Sizes:',& + & ' Send:',sdsz(:),' Receive:',rvsz(:) + + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),iarcv,info) + if (info == psb_success_) call psb_ensure_size(max(iszr,1),jarcv,info) + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='ensure_size') + goto 9999 + end if + + ! + ! Now, transpose the matrix, then split between itself + ! and the send buffers + ! + call acoo%transp() + if (acoo%get_nzeros()/= nzl) then + write(0,*) me,'Something strange upon transpose: ',nzl,acoo%get_nzeros() + end if + nzl = acoo%get_nzeros() + hlstart = p_desc_c%get_local_rows() + + + nzd = 0 + do k = 1, nzl + j = acoo%ia(k) + if (j<=hlstart) then + nzd = nzd + 1 + acoo%ia(nzd) = acoo%ia(k) + acoo%ja(nzd) = acoo%ja(k) + acoo%val(nzd) = acoo%val(k) + else + call p_desc_c%indxmap%qry_halo_owner(j,proc,info) + tsdx(proc+1) = tsdx(proc+1) +1 + iasnd(tsdx(proc+1)) = acoo%ia(k) + jasnd(tsdx(proc+1)) = acoo%ja(k) + valsnd(tsdx(proc+1)) = acoo%val(k) + end if + end do + call acoo%set_nzeros(nzd) + ! + ! Put halo entries in global numbering + ! + call desc_r%indxmap%l2gip(jasnd(1:iszs),info) + call p_desc_c%indxmap%l2gip(iasnd(1:iszs),info) + ! And exchange data. + ! Normally we'll use our SIMPLE A2AV and not MPI, because + ! the communication pattern is sparse, so ours is more + ! efficient. Using ACOO for the receive buffers. + nzl = acoo%get_nzeros() + call acoo%ensure_size(nzl+iszr) + + select case(psb_get_sp_a2av_alg()) + case(psb_sp_a2av_smpl_triad_) + call psb_simple_triad_a2av(valsnd,iasnd,jasnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),iarcv(1:iszr),& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_smpl_v_) + call psb_simple_a2av(valsnd,sdsz,bsdindx,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(iasnd,sdsz,bsdindx,& + & iarcv(1:iszr),rvsz,brvindx,ictxt,info) + if (info == psb_success_) call psb_simple_a2av(jasnd,sdsz,bsdindx,& + & jarcv(1:iszr),rvsz,brvindx,ictxt,info) + case(psb_sp_a2av_mpi_) + + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_dpk_,& + & acoo%val(nzl+1:nzl+iszr),rvsz,brvindx,psb_mpi_c_dpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & iarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo == mpi_success) & + & call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_lpk_,& + & jarcv(1:iszr),rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= mpi_success) info = minfo + case default + info = psb_err_internal_error_ + call psb_errpush(info,name,a_err='wrong A2AV alg selector') + goto 9999 + end select + + if (info /= psb_success_) then + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='alltoallv') + goto 9999 + end if + + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done alltoallv' + + if (present(desc_rx)) then + ! + ! Extend the appropriate descriptor; started as R but on + ! transpose it now describes C + ! + call desc_r%clone(desc_rx,info) + call psb_cd_reinit(desc_rx,info) + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + call desc_rx%g2lip_ins(jarcv(1:nzt),info) + call psb_cdasb(desc_rx,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + ! + ! Insert to extend descriptor + ! + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_rx%get_local_cols()) + !write(0,*) me,' Trans RX ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() else - call ain%mv_to_lcoo(atcoo,info) + ! + ! + ! Take out non-local rows + ! + call psb_glob_to_loc(iarcv(1:iszr),p_desc_c,info,iact='I',owned=.true.) + call psb_glob_to_loc(jarcv(1:iszr),desc_r,info,iact='I') + lszr = iszr + call psb_coo_clean_negidx_inner(lszr,iarcv(1:iszr),jarcv(1:iszr),& + & acoo%val(nzl+1:nzl+iszr),nzt,info) + acoo%ia(nzl+1:nzl+nzt) = iarcv(1:nzt) + acoo%ja(nzl+1:nzl+nzt) = jarcv(1:nzt) + nzl = nzl + nzt + call acoo%set_nzeros(nzl) + nzl = acoo%get_nzeros() + call acoo%set_sorted(.false.) + + call acoo%set_nrows(p_desc_c%get_local_rows()) + call acoo%set_ncols(desc_r%get_local_cols()) + !write(0,*) me,' Trans R- ',acoo%get_nrows(),acoo%get_ncols(),acoo%get_nzeros() end if - if (info == 0) call psb_glob_transpose(atcoo,desc_r,info,desc_c=desc_c,desc_rx=desc_rx) + +!!$ write(0,*) me,' Sanity check after rx%g2l :',count(acoo%ja(1:nzl)<0) + + + call acoo%fix(info) + nzl = acoo%get_nzeros() + if (present(atrans)) then - call atrans%mv_from_lcoo(atcoo,info) + call atrans%mv_from_coo(acoo,info) else - call ain%mv_from_lcoo(atcoo,info) + call ain%mv_from_coo(acoo,info) end if + Deallocate(brvindx,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,& + & iarcv,jarcv,stat=info) + if (debug_level >= psb_debug_outer_)& + & write(debug_unit,*) me,' ',trim(name),': Done' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return end subroutine psb_z_coo_glob_transpose subroutine psb_z_simple_glob_transpose_ip(ain,desc_a,info) @@ -434,7 +707,7 @@ subroutine psb_z_simple_glob_transpose_ip(ain,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_lz_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_z_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz @@ -485,7 +758,7 @@ subroutine psb_z_simple_glob_transpose(ain,aout,desc_a,info) ! that the same DESC_A works for both A and A^T, which ! essentially means that A has a symmetric pattern. ! - type(psb_lz_coo_sparse_mat) :: tmpc1, tmpc2 + type(psb_z_coo_sparse_mat) :: tmpc1, tmpc2 integer(psb_ipk_) :: nz1, nz2, nzh, nz integer(psb_ipk_) :: ictxt, me, np integer(psb_lpk_) :: i, j, k, nrow, ncol, nlz diff --git a/base/tools/psb_zspasb.f90 b/base/tools/psb_zspasb.f90 index 9db28550..6cdfc61f 100644 --- a/base/tools/psb_zspasb.f90 +++ b/base/tools/psb_zspasb.f90 @@ -122,11 +122,11 @@ subroutine psb_zspasb(a,desc_a, info, afmt, upd, dupl, mold) end if - IF (debug_level >= psb_debug_ext_) then + if (debug_level >= psb_debug_ext_) then ch_err=a%get_fmt() write(debug_unit, *) me,' ',trim(name),': From SPCNV',& & info,' ',ch_err - end IF + end if if (psb_errstatus_fatal()) then info=psb_err_from_subroutine_ diff --git a/docs/html/userhtmlsu2.html b/docs/html/userhtmlsu2.html index 9c452948..1a266fc2 100644 --- a/docs/html/userhtmlsu2.html +++ b/docs/html/userhtmlsu2.html @@ -114,7 +114,7 @@ class="description">Each process has its own value(s) independently. src="userhtml0x.png" alt="psb_version_string_ " class="math-display" >

whose current value is 3.4.0 +class="cmtt-10">3.7.0 diff --git a/docs/html/userhtmlsu3.html b/docs/html/userhtmlsu3.html index 15721cf1..e01fd793 100644 --- a/docs/html/userhtmlsu3.html +++ b/docs/html/userhtmlsu3.html @@ -128,11 +128,13 @@ href="userhtml10.html#fn3x0">3 .

  • Call the iterative method of choice, e.g. psb_bicgstab
  • -

    This is the structure of the sample programs in the directory Call the iterative driver psb_krylov with the method of choice, e.g. + bicgstab. +

    This is the structure of the sample programs in the directory test/pargen/. -

    For a simulation in which the same discretization mesh is used over multiple time +

    For a simulation in which the same discretization mesh is used over multiple time steps, the following structure may be more appropriate:

    1. prec%build class="enumerate" id="x9-6043x5">Call the iterative method of choice, e.g. psb_bicgstab
    -

    The insertion routines will be called as many times as needed; they only need to be +

    The insertion routines will be called as many times as needed; they only need to be called on the data that is actually allocated to the current process, i.e. each process generates its own data. -

    In principle there is no specific order in the calls to

    In principle there is no specific order in the calls to psb_spins, nor is there a requirement to build a matrix row in its entirety before calling the routine; this allows the application programmer to walk through the discretization mesh element by element, generating the main part of a given matrix row but also contributions to the rows corresponding to neighbouring elements. -

    From a functional point of view it is even possible to execute one call for each +

    From a functional point of view it is even possible to execute one call for each nonzero coefficient; however this would have a substantial computational overhead. It is therefore advisable to pack a certain amount of data into each call to the insertion routine, say touching on a few tens of rows; the best @@ -209,10 +211,10 @@ process and pass it in a single call to psb_spins; this, however, would entail a doubling of memory occupation, and thus would be almost always far from optimal. -

    +

    2.3.1 User-defined index mappings
    -

    PSBLAS supports user-defined global to local index mappings, subject to the +

    PSBLAS supports user-defined global to local index mappings, subject to the constraints outlined in sec. 2.3:

      @@ -230,7 +232,7 @@ class="cmmi-10">…ncol i;
    -

    but otherwise the mapping is arbitrary. The user application is responsible to ensure +

    but otherwise the mapping is arbitrary. The user application is responsible to ensure consistency of this mapping; some errors may be caught by the library, but this is not guaranteed. The application structure to support this usage is as follows: @@ -271,12 +273,12 @@ class="cmtt-10">irw, respectively, are already local indice -