From 7461cd9c5eb730fba55c73de3ca237368bba527b Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Tue, 23 Sep 2008 11:14:37 +0000 Subject: [PATCH] Merged encapsluation fixes in desc_type from trunk. --- base/comm/Makefile | 2 +- base/comm/psb_cgather.f90 | 12 +- base/comm/psb_cscatter.F90 | 435 ++++++++++++++++ base/comm/psb_dgather.f90 | 10 +- base/comm/psb_dscatter.F90 | 8 +- base/comm/psb_igather.f90 | 8 +- base/comm/psb_iscatter.F90 | 8 +- base/comm/psb_sgather.f90 | 8 +- base/comm/psb_sscatter.F90 | 435 ++++++++++++++++ base/comm/psb_zgather.f90 | 8 +- base/comm/psb_zscatter.F90 | 8 +- base/internals/Makefile | 2 +- .../{psi_bld_hash.f90 => psi_bld_g2lmap.f90} | 53 +- base/internals/psi_bld_tmphalo.f90 | 7 +- base/internals/psi_desc_index.F90 | 12 +- base/internals/psi_extrct_dl.F90 | 1 + base/internals/psi_idx_cnv.f90 | 20 +- base/internals/psi_idx_ins_cnv.f90 | 40 +- base/internals/psi_ldsc_pre_halo.f90 | 18 +- base/internals/psi_sort_dl.f90 | 2 +- base/modules/psb_comm_mod.f90 | 32 ++ base/modules/psb_desc_type.f90 | 469 +++++++++++++----- base/modules/psb_hash_mod.f90 | 48 +- base/modules/psi_mod.f90 | 14 +- base/tools/psb_ccdbldext.F90 | 62 ++- base/tools/psb_cd_inloc.f90 | 18 +- base/tools/psb_cd_set_bld.f90 | 5 +- base/tools/psb_cdals.f90 | 26 +- base/tools/psb_cdalv.f90 | 24 +- base/tools/psb_cdcpy.f90 | 31 +- base/tools/psb_cdprt.f90 | 26 +- base/tools/psb_cdren.f90 | 6 +- base/tools/psb_cdrep.f90 | 10 +- base/tools/psb_cspins.f90 | 6 +- base/tools/psb_dcdbldext.F90 | 60 ++- base/tools/psb_dspins.f90 | 6 +- base/tools/psb_icdasb.F90 | 4 +- base/tools/psb_loc_to_glob.f90 | 53 +- base/tools/psb_scdbldext.F90 | 62 ++- base/tools/psb_sspins.f90 | 6 +- base/tools/psb_zcdbldext.F90 | 62 ++- base/tools/psb_zspins.f90 | 6 +- 42 files changed, 1641 insertions(+), 492 deletions(-) create mode 100644 base/comm/psb_cscatter.F90 create mode 100644 base/comm/psb_sscatter.F90 rename base/internals/{psi_bld_hash.f90 => psi_bld_g2lmap.f90} (77%) diff --git a/base/comm/Makefile b/base/comm/Makefile index 8d50528d..c6c97f25 100644 --- a/base/comm/Makefile +++ b/base/comm/Makefile @@ -6,7 +6,7 @@ OBJS = psb_dgather.o psb_dhalo.o psb_dovrl.o \ psb_cgather.o psb_chalo.o psb_covrl.o \ psb_zgather.o psb_zhalo.o psb_zovrl.o -MPFOBJS=psb_dscatter.o psb_zscatter.o psb_iscatter.o +MPFOBJS=psb_dscatter.o psb_zscatter.o psb_iscatter.o psb_cscatter.o psb_sscatter.o LIBDIR=.. MODDIR=../modules FINCLUDES=$(FMFLAG)$(LIBDIR) $(FMFLAG)$(MODDIR) $(FMFLAG). diff --git a/base/comm/psb_cgather.f90 b/base/comm/psb_cgather.f90 index 850f4f0c..93e6ce64 100644 --- a/base/comm/psb_cgather.f90 +++ b/base/comm/psb_cgather.f90 @@ -139,7 +139,7 @@ subroutine psb_cgatherm(globx, locx, desc_a, info, iroot) do j=1,k do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx,jglobx+j-1) = locx(i,jlx+j-1) end do end do @@ -149,8 +149,8 @@ subroutine psb_cgatherm(globx, locx, desc_a, info, iroot) do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) - globx(idx,jglobx+j-1) = zzero + idx = desc_a%idxmap%loc_to_glob(idx) + globx(idx,jglobx+j-1) = czero end if end do end do @@ -304,15 +304,15 @@ subroutine psb_cgatherv(globx, locx, desc_a, info, iroot) globx(:)=0.d0 do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx) = locx(i) end do ! adjust overlapped elements do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) - globx(idx) = dzero + idx = desc_a%idxmap%loc_to_glob(idx) + globx(idx) = czero end if end do call psb_sum(ictxt,globx(1:m),root=root) diff --git a/base/comm/psb_cscatter.F90 b/base/comm/psb_cscatter.F90 new file mode 100644 index 00000000..bc1b3809 --- /dev/null +++ b/base/comm/psb_cscatter.F90 @@ -0,0 +1,435 @@ +!!$ +!!$ Parallel Sparse BLAS version 2.2 +!!$ (C) Copyright 2006/2007/2008 +!!$ Salvatore Filippone University of Rome Tor Vergata +!!$ Alfredo Buttari University of Rome Tor Vergata +!!$ +!!$ Redistribution and use in source and binary forms, with or without +!!$ modification, are permitted provided that the following conditions +!!$ are met: +!!$ 1. Redistributions of source code must retain the above copyright +!!$ notice, this list of conditions and the following disclaimer. +!!$ 2. Redistributions in binary form must reproduce the above copyright +!!$ notice, this list of conditions, and the following disclaimer in the +!!$ documentation and/or other materials provided with the distribution. +!!$ 3. The name of the PSBLAS group or the names of its contributors may +!!$ not be used to endorse or promote products derived from this +!!$ software without specific written permission. +!!$ +!!$ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +!!$ ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +!!$ TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +!!$ PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PSBLAS GROUP OR ITS CONTRIBUTORS +!!$ BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +!!$ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +!!$ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +!!$ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +!!$ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +!!$ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +!!$ POSSIBILITY OF SUCH DAMAGE. +!!$ +!!$ +! File: psb_cscatter.f90 +! +! Subroutine: psb_cscatterm +! This subroutine scatters a global matrix locally owned by one process +! into pieces that are local to alle the processes. +! +! Arguments: +! globx - complex,dimension(:,:). The global matrix to scatter. +! locx - complex,dimension(:,:). The local piece of the distributed matrix. +! desc_a - type(psb_desc_type). The communication descriptor. +! info - integer. Error code. +! iroot - integer(optional). The process that owns the global matrix. +! If -1 all the processes have a copy. +! Default -1 +subroutine psb_cscatterm(globx, locx, desc_a, info, iroot) + + use psb_descriptor_type + use psb_check_mod + use psb_error_mod +#ifdef MPI_MOD + use mpi +#endif + use psb_penv_mod + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + complex(psb_spk_), intent(out) :: locx(:,:) + complex(psb_spk_), intent(in) :: globx(:,:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: iroot + + + ! locals + integer :: int_err(5), ictxt, np, me,& + & err_act, m, n, i, j, idx, nrow, iiroot, iglobx, jglobx,& + & ilocx, jlocx, lda_locx, lda_globx, lock, globk, icomm, k, maxk, root, ilx,& + & jlx, myrank, rootrank, c, pos + complex(psb_spk_),allocatable :: scatterv(:) + integer, allocatable :: displ(:), l_t_g_all(:), all_dim(:) + character(len=20) :: name, ch_err + + name='psb_scatterm' + if(psb_get_errstatus() /= 0) return + info=0 + call psb_erractionsave(err_act) + + ictxt=psb_cd_get_context(desc_a) + + ! check on blacs grid + call psb_info(ictxt, me, np) + if (np == -1) then + info = 2010 + call psb_errpush(info,name) + goto 9999 + endif + + if (present(iroot)) then + root = iroot + if((root < -1).or.(root > np)) then + info=30 + int_err(1:2)=(/5,root/) + call psb_errpush(info,name,i_err=int_err) + goto 9999 + end if + else + root = -1 + end if + if (root == -1) then + iiroot = psb_root_ + endif + + iglobx = 1 + jglobx = 1 + ilocx = 1 + jlocx = 1 + lda_globx = size(globx,1) + lda_locx = size(locx, 1) + + m = psb_cd_get_global_rows(desc_a) + n = psb_cd_get_global_cols(desc_a) + + lock=size(locx,2)-jlocx+1 + globk=size(globx,2)-jglobx+1 + maxk=min(lock,globk) + k = maxk + call psb_get_mpicomm(ictxt,icomm) + call psb_get_rank(myrank,ictxt,me) + + + lda_globx = size(globx) + lda_locx = size(locx) + + m = psb_cd_get_global_rows(desc_a) + n = psb_cd_get_global_cols(desc_a) + + call psb_bcast(ictxt,k,root=iiroot) + + ! there should be a global check on k here!!! + + call psb_chkglobvect(m,n,size(globx),iglobx,jglobx,desc_a,info) + if (info == 0) call psb_chkvect(m,n,size(locx),ilocx,jlocx,desc_a,info,ilx,jlx) + if(info /= 0) then + info=4010 + ch_err='psb_chk(glob)vect' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if ((ilx /= 1).or.(iglobx /= 1)) then + info=3040 + call psb_errpush(info,name) + goto 9999 + end if + + nrow=psb_cd_get_local_rows(desc_a) + + if ((root == -1).or.(np==1)) then + ! extract my chunk + do j=1,k + do i=1, nrow + idx = desc_a%idxmap%loc_to_glob(i) + locx(i,jlocx+j-1)=globx(idx,jglobx+j-1) + end do + end do + else + call psb_get_rank(rootrank,ictxt,root) + + ! root has to gather size information + allocate(displ(np),all_dim(np),stat=info) + if(info /= 0) then + info=4010 + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + call mpi_gather(nrow,1,mpi_integer,all_dim,& + & 1,mpi_integer,rootrank,icomm,info) + + if (me == root) then + displ(1)=0 + do i=2,np + displ(i)=displ(i-1)+all_dim(i-1) + end do + + ! root has to gather loc_glob from each process + allocate(l_t_g_all(sum(all_dim)),scatterv(sum(all_dim)),stat=info) + if(info /= 0) then + info=4010 + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + end if + + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& + & mpi_integer,l_t_g_all,all_dim,& + & displ,mpi_integer,rootrank,icomm,info) + + + do c=1, k + ! prepare vector to scatter + if(me == root) then + do i=1,np + pos=displ(i) + do j=1, all_dim(i) + idx=l_t_g_all(pos+j) + scatterv(pos+j)=globx(idx,jglobx+c-1) + end do + end do + end if + + ! scatter !!! + call mpi_scatterv(scatterv,all_dim,displ,& + & mpi_complex,locx(1,jlocx+c-1),nrow,& + & mpi_complex,rootrank,icomm,info) + + end do + + if (me==root) deallocate(all_dim, l_t_g_all, displ, scatterv) + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return + +end subroutine psb_cscatterm + + + + +!!$ +!!$ Parallel Sparse BLAS version 2.2 +!!$ (C) Copyright 2006/2007/2008 +!!$ Salvatore Filippone University of Rome Tor Vergata +!!$ Alfredo Buttari University of Rome Tor Vergata +!!$ +!!$ Redistribution and use in source and binary forms, with or without +!!$ modification, are permitted provided that the following conditions +!!$ are met: +!!$ 1. Redistributions of source code must retain the above copyright +!!$ notice, this list of conditions and the following disclaimer. +!!$ 2. Redistributions in binary form must reproduce the above copyright +!!$ notice, this list of conditions, and the following disclaimer in the +!!$ documentation and/or other materials provided with the distribution. +!!$ 3. The name of the PSBLAS group or the names of its contributors may +!!$ not be used to endorse or promote products derived from this +!!$ software without specific written permission. +!!$ +!!$ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +!!$ ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +!!$ TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +!!$ PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PSBLAS GROUP OR ITS CONTRIBUTORS +!!$ BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +!!$ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +!!$ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +!!$ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +!!$ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +!!$ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +!!$ POSSIBILITY OF SUCH DAMAGE. +!!$ +!!$ + +! Subroutine: psb_cscatterv +! This subroutine scatters a global vector locally owned by one process +! into pieces that are local to alle the processes. +! +! Arguments: +! globx - complex,dimension(:). The global vector to scatter. +! locx - complex,dimension(:). The local piece of the ditributed vector. +! desc_a - type(psb_desc_type). The communication descriptor. +! info - integer. Return code +! iroot - integer(optional). The process that owns the global vector. If -1 all +! the processes have a copy. +! +subroutine psb_cscatterv(globx, locx, desc_a, info, iroot) + use psb_descriptor_type + use psb_check_mod + use psb_error_mod +#ifdef MPI_MOD + use mpi +#endif + use psb_penv_mod + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + complex(psb_spk_), intent(out) :: locx(:) + complex(psb_spk_), intent(in) :: globx(:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: iroot + + + ! locals + integer :: int_err(5), ictxt, np, me, & + & err_act, m, n, i, j, idx, nrow, iglobx, jglobx,& + & ilocx, jlocx, lda_locx, lda_globx, root, k, icomm, myrank,& + & rootrank, pos, ilx, jlx + complex(psb_spk_), allocatable :: scatterv(:) + integer, allocatable :: displ(:), l_t_g_all(:), all_dim(:) + character(len=20) :: name, ch_err + integer :: debug_level, debug_unit + + name='psb_scatterv' + if (psb_get_errstatus() /= 0) return + info=0 + call psb_erractionsave(err_act) + ictxt=psb_cd_get_context(desc_a) + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + + ! check on blacs grid + call psb_info(ictxt, me, np) + if (np == -1) then + info = 2010 + call psb_errpush(info,name) + goto 9999 + endif + + if (present(iroot)) then + root = iroot + if((root < -1).or.(root > np)) then + info=30 + int_err(1:2)=(/5,root/) + call psb_errpush(info,name,i_err=int_err) + goto 9999 + end if + else + root = -1 + end if + + call psb_get_mpicomm(ictxt,icomm) + call psb_get_rank(myrank,ictxt,me) + + iglobx = 1 + jglobx = 1 + ilocx = 1 + jlocx = 1 + lda_globx = size(globx) + lda_locx = size(locx) + + m = psb_cd_get_global_rows(desc_a) + n = psb_cd_get_global_cols(desc_a) + + k = 1 + ! there should be a global check on k here!!! + + call psb_chkglobvect(m,n,size(globx),iglobx,jglobx,desc_a,info) + if (info == 0) & + & call psb_chkvect(m,n,size(locx),ilocx,jlocx,desc_a,info,ilx,jlx) + if(info /= 0) then + info=4010 + ch_err='psb_chk(glob)vect' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if ((ilx /= 1).or.(iglobx /= 1)) then + info=3040 + call psb_errpush(info,name) + goto 9999 + end if + + nrow = psb_cd_get_local_rows(desc_a) + + if ((root == -1).or.(np==1)) then + ! extract my chunk + do i=1, nrow + idx=desc_a%idxmap%loc_to_glob(i) + locx(i)=globx(idx) + end do + else + call psb_get_rank(rootrank,ictxt,root) + + ! root has to gather size information + allocate(displ(np),all_dim(np)) + + call mpi_gather(nrow,1,mpi_integer,all_dim,& + & 1,mpi_integer,rootrank,icomm,info) + + if(me == root) then + displ(1)=0 + do i=2,np + displ(i)=displ(i-1) + all_dim(i-1) + end do + if (debug_level >= psb_debug_inner_) then + write(debug_unit,*) me,' ',trim(name),' displ:',displ(1:np), & + &' dim',all_dim(1:np), sum(all_dim) + endif + + ! root has to gather loc_glob from each process + allocate(l_t_g_all(sum(all_dim)),scatterv(sum(all_dim))) + end if + + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& + & mpi_integer,l_t_g_all,all_dim,& + & displ,mpi_integer,rootrank,icomm,info) + + ! prepare vector to scatter + if (me == root) then + do i=1,np + pos=displ(i) + do j=1, all_dim(i) + idx=l_t_g_all(pos+j) + scatterv(pos+j)=globx(idx) + + end do + end do + end if + + call mpi_scatterv(scatterv,all_dim,displ,& + & mpi_complex,locx,nrow,& + & mpi_complex,rootrank,icomm,info) + + if (me==root) deallocate(all_dim, l_t_g_all, displ, scatterv) + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return + +end subroutine psb_cscatterv diff --git a/base/comm/psb_dgather.f90 b/base/comm/psb_dgather.f90 index 1a678946..150cf599 100644 --- a/base/comm/psb_dgather.f90 +++ b/base/comm/psb_dgather.f90 @@ -137,7 +137,7 @@ subroutine psb_dgatherm(globx, locx, desc_a, info, iroot) do j=1,k do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx,jglobx+j-1) = locx(i,jlx+j-1) end do end do @@ -146,7 +146,7 @@ subroutine psb_dgatherm(globx, locx, desc_a, info, iroot) do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx,jglobx+j-1) = dzero end if end do @@ -300,15 +300,15 @@ subroutine psb_dgatherv(globx, locx, desc_a, info, iroot) globx(:)=0.d0 do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) - globx(idx) = locx(i) + idx = desc_a%idxmap%loc_to_glob(i) + globx(idx) = locx(i) end do ! adjust overlapped elements do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx) = dzero end if end do diff --git a/base/comm/psb_dscatter.F90 b/base/comm/psb_dscatter.F90 index c3b272fd..e5750b3a 100644 --- a/base/comm/psb_dscatter.F90 +++ b/base/comm/psb_dscatter.F90 @@ -152,7 +152,7 @@ subroutine psb_dscatterm(globx, locx, desc_a, info, iroot) ! extract my chunk do j=1,k do i=1, nrow - idx=desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) locx(i,jlocx+j-1)=globx(idx,jglobx+j-1) end do end do @@ -187,7 +187,7 @@ subroutine psb_dscatterm(globx, locx, desc_a, info, iroot) end if - call mpi_gatherv(desc_a%loc_to_glob,nrow,& + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& & mpi_integer,l_t_g_all,all_dim,& & displ,mpi_integer,rootrank,icomm,info) @@ -371,7 +371,7 @@ subroutine psb_dscatterv(globx, locx, desc_a, info, iroot) if ((root == -1).or.(np==1)) then ! extract my chunk do i=1, nrow - idx=desc_a%loc_to_glob(i) + idx=desc_a%idxmap%loc_to_glob(i) locx(i)=globx(idx) end do else @@ -397,7 +397,7 @@ subroutine psb_dscatterv(globx, locx, desc_a, info, iroot) allocate(l_t_g_all(sum(all_dim)),scatterv(sum(all_dim))) end if - call mpi_gatherv(desc_a%loc_to_glob,nrow,& + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& & mpi_integer,l_t_g_all,all_dim,& & displ,mpi_integer,rootrank,icomm,info) diff --git a/base/comm/psb_igather.f90 b/base/comm/psb_igather.f90 index 9f4e2002..b08d4428 100644 --- a/base/comm/psb_igather.f90 +++ b/base/comm/psb_igather.f90 @@ -137,7 +137,7 @@ subroutine psb_igatherm(globx, locx, desc_a, info, iroot) do j=1,k do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx,jglobx+j-1) = locx(i,jlx+j-1) end do end do @@ -146,7 +146,7 @@ subroutine psb_igatherm(globx, locx, desc_a, info, iroot) do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx,jglobx+j-1) = izero end if end do @@ -300,14 +300,14 @@ subroutine psb_igatherv(globx, locx, desc_a, info, iroot) globx(:)=0 do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx) = locx(i) end do ! adjust overlapped elements do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx) = dzero end if end do diff --git a/base/comm/psb_iscatter.F90 b/base/comm/psb_iscatter.F90 index 95308f7f..f003c8f3 100644 --- a/base/comm/psb_iscatter.F90 +++ b/base/comm/psb_iscatter.F90 @@ -151,7 +151,7 @@ subroutine psb_iscatterm(globx, locx, desc_a, info, iroot) ! extract my chunk do j=1,k do i=1, nrow - idx=desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) locx(i,jlocx+j-1)=globx(idx,jglobx+j-1) end do end do @@ -186,7 +186,7 @@ subroutine psb_iscatterm(globx, locx, desc_a, info, iroot) end if - call mpi_gatherv(desc_a%loc_to_glob,nrow,& + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& & mpi_integer,l_t_g_all,all_dim,& & displ,mpi_integer,rootrank,icomm,info) @@ -370,7 +370,7 @@ subroutine psb_iscatterv(globx, locx, desc_a, info, iroot) if ((root == -1).or.(np==1)) then ! extract my chunk do i=1, nrow - idx=desc_a%loc_to_glob(i) + idx=desc_a%idxmap%loc_to_glob(i) locx(i)=globx(idx) end do else @@ -396,7 +396,7 @@ subroutine psb_iscatterv(globx, locx, desc_a, info, iroot) allocate(l_t_g_all(sum(all_dim)),scatterv(sum(all_dim))) end if - call mpi_gatherv(desc_a%loc_to_glob,nrow,& + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& & mpi_integer,l_t_g_all,all_dim,& & displ,mpi_integer,rootrank,icomm,info) diff --git a/base/comm/psb_sgather.f90 b/base/comm/psb_sgather.f90 index c9f6b66d..97837b6c 100644 --- a/base/comm/psb_sgather.f90 +++ b/base/comm/psb_sgather.f90 @@ -137,7 +137,7 @@ subroutine psb_sgatherm(globx, locx, desc_a, info, iroot) do j=1,k do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx,jglobx+j-1) = locx(i,jlx+j-1) end do end do @@ -146,7 +146,7 @@ subroutine psb_sgatherm(globx, locx, desc_a, info, iroot) do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx,jglobx+j-1) = szero end if end do @@ -300,7 +300,7 @@ subroutine psb_sgatherv(globx, locx, desc_a, info, iroot) globx(:)=0.d0 do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx) = locx(i) end do @@ -308,7 +308,7 @@ subroutine psb_sgatherv(globx, locx, desc_a, info, iroot) do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx) = szero end if end do diff --git a/base/comm/psb_sscatter.F90 b/base/comm/psb_sscatter.F90 new file mode 100644 index 00000000..d3d98b9a --- /dev/null +++ b/base/comm/psb_sscatter.F90 @@ -0,0 +1,435 @@ +!!$ +!!$ Parallel Sparse BLAS version 2.2 +!!$ (C) Copyright 2006/2007/2008 +!!$ Salvatore Filippone University of Rome Tor Vergata +!!$ Alfredo Buttari University of Rome Tor Vergata +!!$ +!!$ Redistribution and use in source and binary forms, with or without +!!$ modification, are permitted provided that the following conditions +!!$ are met: +!!$ 1. Redistributions of source code must retain the above copyright +!!$ notice, this list of conditions and the following disclaimer. +!!$ 2. Redistributions in binary form must reproduce the above copyright +!!$ notice, this list of conditions, and the following disclaimer in the +!!$ documentation and/or other materials provided with the distribution. +!!$ 3. The name of the PSBLAS group or the names of its contributors may +!!$ not be used to endorse or promote products derived from this +!!$ software without specific written permission. +!!$ +!!$ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +!!$ ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +!!$ TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +!!$ PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PSBLAS GROUP OR ITS CONTRIBUTORS +!!$ BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +!!$ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +!!$ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +!!$ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +!!$ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +!!$ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +!!$ POSSIBILITY OF SUCH DAMAGE. +!!$ +!!$ +! File: psb_sscatter.f90 +! +! Subroutine: psb_sscatterm +! This subroutine scatters a global matrix locally owned by one process +! into pieces that are local to alle the processes. +! +! Arguments: +! globx - real,dimension(:,:). The global matrix to scatter. +! locx - real,dimension(:,:). The local piece of the ditributed matrix. +! desc_a - type(psb_desc_type). The communication descriptor. +! info - integer. Error code. +! iroot - integer(optional). The process that owns the global matrix. If -1 all +! the processes have a copy. Default -1. +! +subroutine psb_sscatterm(globx, locx, desc_a, info, iroot) + + use psb_descriptor_type + use psb_check_mod + use psb_error_mod +#ifdef MPI_MOD + use mpi +#endif + use psb_penv_mod + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + real(psb_spk_), intent(out) :: locx(:,:) + real(psb_spk_), intent(in) :: globx(:,:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: iroot + + + ! locals + integer :: int_err(5), ictxt, np, me,& + & err_act, m, n, i, j, idx, nrow, iiroot, iglobx, jglobx,& + & ilocx, jlocx, lda_locx, lda_globx, lock, globk, icomm, k, maxk, root, ilx,& + & jlx, myrank, rootrank, c, pos + real(psb_spk_), allocatable :: scatterv(:) + integer, allocatable :: displ(:), l_t_g_all(:), all_dim(:) + character(len=20) :: name, ch_err + + name='psb_scatterm' + if(psb_get_errstatus() /= 0) return + info=0 + call psb_erractionsave(err_act) + + ictxt=psb_cd_get_context(desc_a) + + ! check on blacs grid + call psb_info(ictxt, me, np) + if (np == -1) then + info = 2010 + call psb_errpush(info,name) + goto 9999 + endif + + if (present(iroot)) then + root = iroot + if((root < -1).or.(root > np)) then + info=30 + int_err(1:2)=(/5,root/) + call psb_errpush(info,name,i_err=int_err) + goto 9999 + end if + else + root = -1 + end if + if (root == -1) then + iiroot = psb_root_ + endif + + iglobx = 1 + jglobx = 1 + ilocx = 1 + jlocx = 1 + lda_globx = size(globx,1) + lda_locx = size(locx, 1) + + m = psb_cd_get_global_rows(desc_a) + n = psb_cd_get_global_cols(desc_a) + + lock=size(locx,2)-jlocx+1 + globk=size(globx,2)-jglobx+1 + maxk=min(lock,globk) + k = maxk + call psb_get_mpicomm(ictxt,icomm) + call psb_get_rank(myrank,ictxt,me) + + + lda_globx = size(globx) + lda_locx = size(locx) + + m = psb_cd_get_global_rows(desc_a) + n = psb_cd_get_global_cols(desc_a) + + call psb_bcast(ictxt,k,root=iiroot) + + ! there should be a global check on k here!!! + + call psb_chkglobvect(m,n,size(globx),iglobx,jglobx,desc_a,info) + if (info == 0) call psb_chkvect(m,n,size(locx),ilocx,jlocx,desc_a,info,ilx,jlx) + if(info /= 0) then + info=4010 + ch_err='psb_chk(glob)vect' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if ((ilx /= 1).or.(iglobx /= 1)) then + info=3040 + call psb_errpush(info,name) + goto 9999 + end if + + nrow=psb_cd_get_local_rows(desc_a) + + if ((root == -1).or.(np==1)) then + ! extract my chunk + do j=1,k + do i=1, nrow + idx = desc_a%idxmap%loc_to_glob(i) + locx(i,jlocx+j-1)=globx(idx,jglobx+j-1) + end do + end do + else + call psb_get_rank(rootrank,ictxt,root) + + ! root has to gather size information + allocate(displ(np),all_dim(np),stat=info) + if(info /= 0) then + info=4010 + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + call mpi_gather(nrow,1,mpi_integer,all_dim,& + & 1,mpi_integer,rootrank,icomm,info) + + if (me == root) then + displ(1)=0 + do i=2,np + displ(i)=displ(i-1)+all_dim(i-1) + end do + + ! root has to gather loc_glob from each process + allocate(l_t_g_all(sum(all_dim)),scatterv(sum(all_dim)),stat=info) + if(info /= 0) then + info=4010 + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + end if + + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& + & mpi_integer,l_t_g_all,all_dim,& + & displ,mpi_integer,rootrank,icomm,info) + + + do c=1, k + ! prepare vector to scatter + if(me == root) then + do i=1,np + pos=displ(i) + do j=1, all_dim(i) + idx=l_t_g_all(pos+j) + scatterv(pos+j)=globx(idx,jglobx+c-1) + end do + end do + end if + + ! scatter !!! + call mpi_scatterv(scatterv,all_dim,displ,& + & mpi_real,locx(1,jlocx+c-1),nrow,& + & mpi_real,rootrank,icomm,info) + + end do + + if (me==root) deallocate(all_dim, l_t_g_all, displ, scatterv) + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return + +end subroutine psb_sscatterm + + + + +!!$ +!!$ Parallel Sparse BLAS version 2.2 +!!$ (C) Copyright 2006/2007/2008 +!!$ Salvatore Filippone University of Rome Tor Vergata +!!$ Alfredo Buttari University of Rome Tor Vergata +!!$ +!!$ Redistribution and use in source and binary forms, with or without +!!$ modification, are permitted provided that the following conditions +!!$ are met: +!!$ 1. Redistributions of source code must retain the above copyright +!!$ notice, this list of conditions and the following disclaimer. +!!$ 2. Redistributions in binary form must reproduce the above copyright +!!$ notice, this list of conditions, and the following disclaimer in the +!!$ documentation and/or other materials provided with the distribution. +!!$ 3. The name of the PSBLAS group or the names of its contributors may +!!$ not be used to endorse or promote products derived from this +!!$ software without specific written permission. +!!$ +!!$ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +!!$ ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +!!$ TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +!!$ PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PSBLAS GROUP OR ITS CONTRIBUTORS +!!$ BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +!!$ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +!!$ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +!!$ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +!!$ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +!!$ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +!!$ POSSIBILITY OF SUCH DAMAGE. +!!$ +!!$ + +! Subroutine: psb_sscatterv +! This subroutine scatters a global vector locally owned by one process +! into pieces that are local to alle the processes. +! +! Arguments: +! globx - real,dimension(:). The global vector to scatter. +! locx - real,dimension(:). The local piece of the ditributed vector. +! desc_a - type(psb_desc_type). The communication descriptor. +! info - integer. Error code. +! iroot - integer(optional). The process that owns the global vector. If -1 all +! the processes have a copy. +! +subroutine psb_sscatterv(globx, locx, desc_a, info, iroot) + use psb_descriptor_type + use psb_check_mod + use psb_error_mod +#ifdef MPI_MOD + use mpi +#endif + use psb_penv_mod + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + real(psb_spk_), intent(out) :: locx(:) + real(psb_spk_), intent(in) :: globx(:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: iroot + + + ! locals + integer :: int_err(5), ictxt, np, me, & + & err_act, m, n, i, j, idx, nrow, iglobx, jglobx,& + & ilocx, jlocx, lda_locx, lda_globx, root, k, icomm, myrank,& + & rootrank, pos, ilx, jlx + real(psb_spk_), allocatable :: scatterv(:) + integer, allocatable :: displ(:), l_t_g_all(:), all_dim(:) + character(len=20) :: name, ch_err + integer :: debug_level, debug_unit + + name='psb_scatterv' + if (psb_get_errstatus() /= 0) return + info=0 + call psb_erractionsave(err_act) + ictxt=psb_cd_get_context(desc_a) + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + + ! check on blacs grid + call psb_info(ictxt, me, np) + if (np == -1) then + info = 2010 + call psb_errpush(info,name) + goto 9999 + endif + + if (present(iroot)) then + root = iroot + if((root < -1).or.(root > np)) then + info=30 + int_err(1:2)=(/5,root/) + call psb_errpush(info,name,i_err=int_err) + goto 9999 + end if + else + root = -1 + end if + + call psb_get_mpicomm(ictxt,icomm) + call psb_get_rank(myrank,ictxt,me) + + iglobx = 1 + jglobx = 1 + ilocx = 1 + jlocx = 1 + lda_globx = size(globx) + lda_locx = size(locx) + + m = psb_cd_get_global_rows(desc_a) + n = psb_cd_get_global_cols(desc_a) + + k = 1 + ! there should be a global check on k here!!! + + call psb_chkglobvect(m,n,size(globx),iglobx,jglobx,desc_a,info) + if (info == 0) & + & call psb_chkvect(m,n,size(locx),ilocx,jlocx,desc_a,info,ilx,jlx) + if(info /= 0) then + info=4010 + ch_err='psb_chk(glob)vect' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if ((ilx /= 1).or.(iglobx /= 1)) then + info=3040 + call psb_errpush(info,name) + goto 9999 + end if + + nrow = psb_cd_get_local_rows(desc_a) + + if ((root == -1).or.(np==1)) then + ! extract my chunk + do i=1, nrow + idx=desc_a%idxmap%loc_to_glob(i) + locx(i)=globx(idx) + end do + else + call psb_get_rank(rootrank,ictxt,root) + + ! root has to gather size information + allocate(displ(np),all_dim(np)) + + call mpi_gather(nrow,1,mpi_integer,all_dim,& + & 1,mpi_integer,rootrank,icomm,info) + + if(me == root) then + displ(1)=0 + do i=2,np + displ(i)=displ(i-1) + all_dim(i-1) + end do + if (debug_level >= psb_debug_inner_) then + write(debug_unit,*) me,' ',trim(name),' displ:',displ(1:np), & + &' dim',all_dim(1:np), sum(all_dim) + endif + + ! root has to gather loc_glob from each process + allocate(l_t_g_all(sum(all_dim)),scatterv(sum(all_dim))) + end if + + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& + & mpi_integer,l_t_g_all,all_dim,& + & displ,mpi_integer,rootrank,icomm,info) + + ! prepare vector to scatter + if (me == root) then + do i=1,np + pos=displ(i) + do j=1, all_dim(i) + idx=l_t_g_all(pos+j) + scatterv(pos+j)=globx(idx) + + end do + end do + end if + + call mpi_scatterv(scatterv,all_dim,displ,& + & mpi_real,locx,nrow,& + & mpi_real,rootrank,icomm,info) + + if (me==root) deallocate(all_dim, l_t_g_all, displ, scatterv) + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return + +end subroutine psb_sscatterv diff --git a/base/comm/psb_zgather.f90 b/base/comm/psb_zgather.f90 index 11dac835..7f1d57e0 100644 --- a/base/comm/psb_zgather.f90 +++ b/base/comm/psb_zgather.f90 @@ -139,7 +139,7 @@ subroutine psb_zgatherm(globx, locx, desc_a, info, iroot) do j=1,k do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx,jglobx+j-1) = locx(i,jlx+j-1) end do end do @@ -149,7 +149,7 @@ subroutine psb_zgatherm(globx, locx, desc_a, info, iroot) do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx,jglobx+j-1) = zzero end if end do @@ -304,14 +304,14 @@ subroutine psb_zgatherv(globx, locx, desc_a, info, iroot) globx(:)=0.d0 do i=1,psb_cd_get_local_rows(desc_a) - idx = desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) globx(idx) = locx(i) end do ! adjust overlapped elements do i=1, size(desc_a%ovrlap_elem,1) if (me /= desc_a%ovrlap_elem(i,3)) then idx = desc_a%ovrlap_elem(i,1) - idx = desc_a%loc_to_glob(idx) + idx = desc_a%idxmap%loc_to_glob(idx) globx(idx) = dzero end if end do diff --git a/base/comm/psb_zscatter.F90 b/base/comm/psb_zscatter.F90 index 95321efd..d5a55e70 100644 --- a/base/comm/psb_zscatter.F90 +++ b/base/comm/psb_zscatter.F90 @@ -152,7 +152,7 @@ subroutine psb_zscatterm(globx, locx, desc_a, info, iroot) ! extract my chunk do j=1,k do i=1, nrow - idx=desc_a%loc_to_glob(i) + idx = desc_a%idxmap%loc_to_glob(i) locx(i,jlocx+j-1)=globx(idx,jglobx+j-1) end do end do @@ -187,7 +187,7 @@ subroutine psb_zscatterm(globx, locx, desc_a, info, iroot) end if - call mpi_gatherv(desc_a%loc_to_glob,nrow,& + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& & mpi_integer,l_t_g_all,all_dim,& & displ,mpi_integer,rootrank,icomm,info) @@ -371,7 +371,7 @@ subroutine psb_zscatterv(globx, locx, desc_a, info, iroot) if ((root == -1).or.(np==1)) then ! extract my chunk do i=1, nrow - idx=desc_a%loc_to_glob(i) + idx=desc_a%idxmap%loc_to_glob(i) locx(i)=globx(idx) end do else @@ -397,7 +397,7 @@ subroutine psb_zscatterv(globx, locx, desc_a, info, iroot) allocate(l_t_g_all(sum(all_dim)),scatterv(sum(all_dim))) end if - call mpi_gatherv(desc_a%loc_to_glob,nrow,& + call mpi_gatherv(desc_a%idxmap%loc_to_glob,nrow,& & mpi_integer,l_t_g_all,all_dim,& & displ,mpi_integer,rootrank,icomm,info) diff --git a/base/internals/Makefile b/base/internals/Makefile index 25534b5f..86e8b5f6 100644 --- a/base/internals/Makefile +++ b/base/internals/Makefile @@ -3,7 +3,7 @@ include ../../Make.inc FOBJS = psi_compute_size.o psi_crea_bnd_elem.o psi_crea_index.o \ psi_crea_ovr_elem.o psi_bld_tmpovrl.o psi_dl_check.o \ psi_sort_dl.o \ - psi_ldsc_pre_halo.o psi_bld_tmphalo.o psi_bld_hash.o\ + psi_ldsc_pre_halo.o psi_bld_tmphalo.o psi_bld_g2lmap.o\ psi_sort_dl.o psi_idx_cnv.o psi_idx_ins_cnv.o FOBJS2 = psi_exist_ovr_elem.o psi_list_search.o srtlist.o #COBJS = avltree.o srcht.o diff --git a/base/internals/psi_bld_hash.f90 b/base/internals/psi_bld_g2lmap.f90 similarity index 77% rename from base/internals/psi_bld_hash.f90 rename to base/internals/psi_bld_g2lmap.f90 index 17edcf5d..09506517 100644 --- a/base/internals/psi_bld_hash.f90 +++ b/base/internals/psi_bld_g2lmap.f90 @@ -41,14 +41,14 @@ ! desc - type(psb_desc_type). The communication descriptor. ! info - integer. return code. ! -subroutine psi_bld_hash(desc,info) +subroutine psi_bld_g2lmap(desc,info) use psb_descriptor_type use psb_serial_mod use psb_const_mod use psb_error_mod use psb_penv_mod use psb_realloc_mod - use psi_mod, psb_protect_name => psi_bld_hash + use psi_mod, psb_protect_name => psi_bld_g2lmap implicit none type(psb_desc_type), intent(inout) :: desc integer, intent(out) :: info @@ -59,7 +59,7 @@ subroutine psi_bld_hash(desc,info) character(len=20) :: name,ch_err info = 0 - name = 'psi_bld_hash' + name = 'psi_bld_g2lmap' call psb_erractionsave(err_act) ictxt = psb_cd_get_context(desc) @@ -84,7 +84,7 @@ subroutine psi_bld_hash(desc,info) nk = n_col - call psb_realloc(nk,2,desc%glb_lc,info) + call psb_realloc(nk,2,desc%idxmap%glb_lc,info) nbits = psb_hash_bits hsize = 2**nbits @@ -102,9 +102,9 @@ subroutine psi_bld_hash(desc,info) hsize = hsize * 2 end do hmask = hsize - 1 - desc%hashvsize = hsize - desc%hashvmask = hmask - if (info ==0) call psb_realloc(hsize+1,desc%hashv,info,lb=0) + desc%idxmap%hashvsize = hsize + desc%idxmap%hashvmask = hmask + if (info ==0) call psb_realloc(hsize+1,desc%idxmap%hashv,info,lb=0) if (info /= 0) then ch_err='psb_realloc' call psb_errpush(info,name,a_err=ch_err) @@ -113,37 +113,38 @@ subroutine psi_bld_hash(desc,info) ! Build a hashed table of sorted lists to search for ! indices. - desc%hashv(0:hsize) = 0 + desc%idxmap%hashv(0:hsize) = 0 do i=1, nk - key = desc%loc_to_glob(i) + key = desc%idxmap%loc_to_glob(i) ih = iand(key,hmask) - desc%hashv(ih) = desc%hashv(ih) + 1 + desc%idxmap%hashv(ih) = desc%idxmap%hashv(ih) + 1 end do - nh = desc%hashv(0) + nh = desc%idxmap%hashv(0) idx = 1 do i=1, hsize - desc%hashv(i-1) = idx + desc%idxmap%hashv(i-1) = idx idx = idx + nh - nh = desc%hashv(i) + nh = desc%idxmap%hashv(i) end do do i=1, nk - key = desc%loc_to_glob(i) - ih = iand(key,hmask) - idx = desc%hashv(ih) - desc%glb_lc(idx,1) = key - desc%glb_lc(idx,2) = i - desc%hashv(ih) = desc%hashv(ih) + 1 + key = desc%idxmap%loc_to_glob(i) + ih = iand(key,hmask) + idx = desc%idxmap%hashv(ih) + desc%idxmap%glb_lc(idx,1) = key + desc%idxmap%glb_lc(idx,2) = i + desc%idxmap%hashv(ih) = desc%idxmap%hashv(ih) + 1 end do do i = hsize, 1, -1 - desc%hashv(i) = desc%hashv(i-1) + desc%idxmap%hashv(i) = desc%idxmap%hashv(i-1) end do - desc%hashv(0) = 1 + desc%idxmap%hashv(0) = 1 do i=0, hsize-1 - idx = desc%hashv(i) - nh = desc%hashv(i+1) - desc%hashv(i) + idx = desc%idxmap%hashv(i) + nh = desc%idxmap%hashv(i+1) - desc%idxmap%hashv(i) if (nh > 1) then - call psb_msort(desc%glb_lc(idx:idx+nh-1,1),& - & ix=desc%glb_lc(idx:idx+nh-1,2),flag=psb_sort_keep_idx_) + call psb_msort(desc%idxmap%glb_lc(idx:idx+nh-1,1),& + & ix=desc%idxmap%glb_lc(idx:idx+nh-1,2),& + & flag=psb_sort_keep_idx_) end if end do @@ -161,4 +162,4 @@ subroutine psi_bld_hash(desc,info) return -end subroutine psi_bld_hash +end subroutine psi_bld_g2lmap diff --git a/base/internals/psi_bld_tmphalo.f90 b/base/internals/psi_bld_tmphalo.f90 index 66d9be71..6cd5d8e6 100644 --- a/base/internals/psi_bld_tmphalo.f90 +++ b/base/internals/psi_bld_tmphalo.f90 @@ -98,10 +98,11 @@ subroutine psi_bld_tmphalo(desc,info) end if do i=1, nh - helem(i) = desc%loc_to_glob(n_row+i) + helem(i) = n_row+i ! desc%loc_to_glob(n_row+i) end do - - call psi_fnd_owner(nh,helem,hproc,desc,info) + call psb_map_l2g(helem,desc%idxmap,info) + if (info == 0) & + & call psi_fnd_owner(nh,helem,hproc,desc,info) if (info /= 0) then call psb_errpush(4010,name,a_err='fnd_owner') goto 9999 diff --git a/base/internals/psi_desc_index.F90 b/base/internals/psi_desc_index.F90 index b1d22084..e41ea528 100644 --- a/base/internals/psi_desc_index.F90 +++ b/base/internals/psi_desc_index.F90 @@ -257,10 +257,14 @@ subroutine psi_desc_index(desc,index_in,dep_list,& sndbuf(bsdindx(proc+1)+j) = (index_in(i+j)) end do else - - do j=1, nerv - sndbuf(bsdindx(proc+1)+j) = desc%loc_to_glob(index_in(i+j)) - end do + call psb_map_l2g(index_in(i+1:i+nerv),& + & sndbuf(bsdindx(proc+1)+1:bsdindx(proc+1)+nerv),& + & desc%idxmap,info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='psb_map_l2g') + goto 9999 + end if + endif bsdindx(proc+1) = bsdindx(proc+1) + nerv i = i + nerv + 1 diff --git a/base/internals/psi_extrct_dl.F90 b/base/internals/psi_extrct_dl.F90 index 87f343e5..46d5f73e 100644 --- a/base/internals/psi_extrct_dl.F90 +++ b/base/internals/psi_extrct_dl.F90 @@ -161,6 +161,7 @@ subroutine psi_extract_dep_list(desc_data,desc_str,dep_list,& i=1 if (debug_level >= psb_debug_inner_)& & write(debug_unit,*) me,' ',trim(name),': start ',info,desc_data(psb_dec_type_) + pointer_dep_list=1 if (psb_is_bld_dec(desc_data(psb_dec_type_))) then do while (desc_str(i) /= -1) diff --git a/base/internals/psi_idx_cnv.f90 b/base/internals/psi_idx_cnv.f90 index b81e7c49..bb43b0cb 100644 --- a/base/internals/psi_idx_cnv.f90 +++ b/base/internals/psi_idx_cnv.f90 @@ -134,9 +134,9 @@ subroutine psi_idx_cnv1(nv,idxin,desc,info,mask,owned) idxin(i) = -1 cycle endif - call psi_inner_cnv(ip,lip,desc%hashvmask,desc%hashv,desc%glb_lc) - if ((lip < 0).and.associated(desc%hash)) & - & call psb_hash_searchkey(ip,lip,desc%hash,info) + call psi_inner_cnv(ip,lip,desc%idxmap%hashvmask,desc%idxmap%hashv,desc%idxmap%glb_lc) + if (lip < 0) & + & call psb_hash_searchkey(ip,lip,desc%idxmap%hash,info) if (owned_) then if (lip<=nrow) then idxin(i) = lip @@ -155,9 +155,9 @@ subroutine psi_idx_cnv1(nv,idxin,desc,info,mask,owned) idxin(i) = -1 cycle endif - call psi_inner_cnv(ip,lip,desc%hashvmask,desc%hashv,desc%glb_lc) - if ((lip < 0).and.associated(desc%hash)) & - & call psb_hash_searchkey(ip,lip,desc%hash,info) + call psi_inner_cnv(ip,lip,desc%idxmap%hashvmask,desc%idxmap%hashv,desc%idxmap%glb_lc) + if (lip < 0) & + & call psb_hash_searchkey(ip,lip,desc%idxmap%hash,info) if (owned_) then if (lip<=nrow) then idxin(i) = lip @@ -175,11 +175,11 @@ subroutine psi_idx_cnv1(nv,idxin,desc,info,mask,owned) ! are kept in a (hashed) list of ordered lists, ! hence psi_inner_cnv does the hashing and binary search. ! - if (.not.allocated(desc%hashv)) then + if (.not.allocated(desc%idxmap%hashv)) then info = 4001 call psb_errpush(info,name,a_err='Invalid hashv into inner_cnv') end if - call psi_inner_cnv(nv,idxin,desc%hashvmask,desc%hashv,desc%glb_lc,mask=mask) + call psi_inner_cnv(nv,idxin,desc%idxmap%hashvmask,desc%idxmap%hashv,desc%idxmap%glb_lc,mask=mask) end if else @@ -197,7 +197,7 @@ subroutine psi_idx_cnv1(nv,idxin,desc,info,mask,owned) call psb_errpush(info,name) goto 9999 endif - lip = desc%glob_to_loc(ip) + lip = desc%idxmap%glob_to_loc(ip) if (owned_) then if (lip<=nrow) then idxin(i) = lip @@ -217,7 +217,7 @@ subroutine psi_idx_cnv1(nv,idxin,desc,info,mask,owned) call psb_errpush(info,name) goto 9999 endif - lip = desc%glob_to_loc(ip) + lip = desc%idxmap%glob_to_loc(ip) if (owned_) then if (lip<=nrow) then idxin(i) = lip diff --git a/base/internals/psi_idx_ins_cnv.f90 b/base/internals/psi_idx_ins_cnv.f90 index e5a88ed6..30f6863f 100644 --- a/base/internals/psi_idx_ins_cnv.f90 +++ b/base/internals/psi_idx_ins_cnv.f90 @@ -122,16 +122,16 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) endif nxt = ncol + 1 - call psi_inner_cnv(ip,lip,desc%hashvmask,desc%hashv,desc%glb_lc) + call psi_inner_cnv(ip,lip,desc%idxmap%hashvmask,desc%idxmap%hashv,desc%idxmap%glb_lc) if (lip < 0) & - & call psb_hash_searchinskey(ip,lip,nxt,desc%hash,info) + & call psb_hash_searchinskey(ip,lip,nxt,desc%idxmap%hash,info) if (info >=0) then if (nxt == lip) then ncol = nxt - isize = size(desc%loc_to_glob) + isize = size(desc%idxmap%loc_to_glob) if (ncol > isize) then nh = ncol + max(nv,relocsz) - call psb_realloc(nh,desc%loc_to_glob,info,pad=-1) + call psb_realloc(nh,desc%idxmap%loc_to_glob,info,pad=-1) if (info /= 0) then info=1 ch_err='psb_realloc' @@ -140,7 +140,7 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) end if isize = nh endif - desc%loc_to_glob(nxt) = ip + desc%idxmap%loc_to_glob(nxt) = ip endif info = 0 else @@ -165,16 +165,16 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) endif nxt = ncol + 1 - call psi_inner_cnv(ip,lip,desc%hashvmask,desc%hashv,desc%glb_lc) + call psi_inner_cnv(ip,lip,desc%idxmap%hashvmask,desc%idxmap%hashv,desc%idxmap%glb_lc) if (lip < 0) & - & call psb_hash_searchinskey(ip,lip,nxt,desc%hash,info) + & call psb_hash_searchinskey(ip,lip,nxt,desc%idxmap%hash,info) if (info >=0) then if (nxt == lip) then ncol = nxt - isize = size(desc%loc_to_glob) + isize = size(desc%idxmap%loc_to_glob) if (ncol > isize) then nh = ncol + max(nv,relocsz) - call psb_realloc(nh,desc%loc_to_glob,info,pad=-1) + call psb_realloc(nh,desc%idxmap%loc_to_glob,info,pad=-1) if (info /= 0) then info=1 ch_err='psb_realloc' @@ -183,7 +183,7 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) end if isize = nh endif - desc%loc_to_glob(nxt) = ip + desc%idxmap%loc_to_glob(nxt) = ip endif info = 0 else @@ -238,17 +238,17 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) idxin(i) = -1 cycle endif - k = desc%glob_to_loc(ip) + k = desc%idxmap%glob_to_loc(ip) if (k < -np) then k = k + np k = - k - 1 ncol = ncol + 1 lip = ncol - desc%glob_to_loc(ip) = ncol - isize = size(desc%loc_to_glob) + desc%idxmap%glob_to_loc(ip) = ncol + isize = size(desc%idxmap%loc_to_glob) if (ncol > isize) then nh = ncol + max(nv,relocsz) - call psb_realloc(nh,desc%loc_to_glob,info,pad=-1) + call psb_realloc(nh,desc%idxmap%loc_to_glob,info,pad=-1) if (info /= 0) then info=3 ch_err='psb_realloc' @@ -257,7 +257,7 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) end if isize = nh endif - desc%loc_to_glob(ncol) = ip + desc%idxmap%loc_to_glob(ncol) = ip isize = size(desc%halo_index) if ((pnt_halo+3) > isize) then nh = isize + max(nv,relocsz) @@ -291,17 +291,17 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) idxin(i) = -1 cycle endif - k = desc%glob_to_loc(ip) + k = desc%idxmap%glob_to_loc(ip) if (k < -np) then k = k + np k = - k - 1 ncol = ncol + 1 lip = ncol - desc%glob_to_loc(ip) = ncol - isize = size(desc%loc_to_glob) + desc%idxmap%glob_to_loc(ip) = ncol + isize = size(desc%idxmap%loc_to_glob) if (ncol > isize) then nh = ncol + max(nv,relocsz) - call psb_realloc(nh,desc%loc_to_glob,info,pad=-1) + call psb_realloc(nh,desc%idxmap%loc_to_glob,info,pad=-1) if (info /= 0) then info=3 ch_err='psb_realloc' @@ -310,7 +310,7 @@ subroutine psi_idx_ins_cnv1(nv,idxin,desc,info,mask) end if isize = nh endif - desc%loc_to_glob(ncol) = ip + desc%idxmap%loc_to_glob(ncol) = ip isize = size(desc%halo_index) if ((pnt_halo+3) > isize) then nh = isize + max(nv,relocsz) diff --git a/base/internals/psi_ldsc_pre_halo.f90 b/base/internals/psi_ldsc_pre_halo.f90 index bf11faf7..275f53a4 100644 --- a/base/internals/psi_ldsc_pre_halo.f90 +++ b/base/internals/psi_ldsc_pre_halo.f90 @@ -87,21 +87,19 @@ subroutine psi_ldsc_pre_halo(desc,ext_hv,info) goto 9999 end if - call psi_bld_hash(desc,info) + call psi_bld_g2lmap(desc,info) if (info /= 0) then ch_err='psi_bld_hash' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - ! We no longer need the desc%hash structure. - if (associated(desc%hash)) then - deallocate(desc%hash,stat=info) - if (info /= 0) then - ch_err='psi_bld_tmphalo' - info = 4010 - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if + ! We no longer need the inner hash structure. + call psb_free(desc%idxmap%hash,info) + if (info /= 0) then + ch_err='psi_bld_tmphalo' + info = 4010 + call psb_errpush(info,name,a_err=ch_err) + goto 9999 end if if (.not.ext_hv) then call psi_bld_tmphalo(desc,info) diff --git a/base/internals/psi_sort_dl.f90 b/base/internals/psi_sort_dl.f90 index 8b681756..f07b2292 100644 --- a/base/internals/psi_sort_dl.f90 +++ b/base/internals/psi_sort_dl.f90 @@ -69,7 +69,7 @@ subroutine psi_sort_dl(dep_list,l_dep_list,np,info) isz = iich + ndgmx if (debug_level >= psb_debug_inner_)& & write(debug_unit,*) name,': ndgmx ',ndgmx,isz - + allocate(work(isz)) ! call srtlist(dep_list, dl_lda, l_dep_list, np, info) call srtlist(dep_list,size(dep_list,1),l_dep_list,np,work(idg),& diff --git a/base/modules/psb_comm_mod.f90 b/base/modules/psb_comm_mod.f90 index 686c3794..6f79a774 100644 --- a/base/modules/psb_comm_mod.f90 +++ b/base/modules/psb_comm_mod.f90 @@ -267,6 +267,38 @@ module psb_comm_mod integer, intent(out) :: info integer, intent(in), optional :: root end subroutine psb_iscatterv + subroutine psb_sscatterm(globx, locx, desc_a, info, root) + use psb_descriptor_type + real(psb_spk_), intent(out) :: locx(:,:) + real(psb_spk_), intent(in) :: globx(:,:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: root + end subroutine psb_sscatterm + subroutine psb_sscatterv(globx, locx, desc_a, info, root) + use psb_descriptor_type + real(psb_spk_), intent(out) :: locx(:) + real(psb_spk_), intent(in) :: globx(:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: root + end subroutine psb_sscatterv + subroutine psb_cscatterm(globx, locx, desc_a, info, root) + use psb_descriptor_type + complex(psb_spk_), intent(out) :: locx(:,:) + complex(psb_spk_), intent(in) :: globx(:,:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: root + end subroutine psb_cscatterm + subroutine psb_cscatterv(globx, locx, desc_a, info, root) + use psb_descriptor_type + complex(psb_spk_), intent(out) :: locx(:) + complex(psb_spk_), intent(in) :: globx(:) + type(psb_desc_type), intent(in) :: desc_a + integer, intent(out) :: info + integer, intent(in), optional :: root + end subroutine psb_cscatterv end interface interface psb_gather diff --git a/base/modules/psb_desc_type.f90 b/base/modules/psb_desc_type.f90 index d153b7b7..b1bf2fad 100644 --- a/base/modules/psb_desc_type.f90 +++ b/base/modules/psb_desc_type.f90 @@ -125,19 +125,26 @@ module psb_descriptor_type ! type: psb_desc_type ! ! Communication Descriptor data structure. + !| type psb_idxmap_type + !| integer :: state + !| integer, allocatable :: loc_to_glob(:) + !| integer, allocatable :: glob_to_loc(:) + !| integer :: hashvsize, hashvmask + !| integer, allocatable :: hashv(:), glb_lc(:,:) + !| type(psb_hash_type) :: hash + !| end type psb_idxmap_type + ! + ! !| type psb_desc_type - !| integer, allocatable :: matrix_data(:) - !| integer, allocatable :: halo_index(:), ext_index(:) - !| integer, allocatable :: bnd_elem(:) - !| integer, allocatable :: ovrlap_index(:) - !| integer, allocatable :: ovrlap_elem(:,:) - !| integer, allocatable :: ovr_mst_idx(:) - !| integer, allocatable :: loc_to_glob(:) - !| integer, allocatable :: glob_to_loc (:) - !| integer, allocatable :: hashv(:), glb_lc(:,:) - !| integer, allocatable :: lprm(:) - !| integer, allocatable :: idx_space(:) - !| type(psb_hash_type), pointer :: hash => null() + !| integer, allocatable :: matrix_data(:) + !| integer, allocatable :: halo_index(:), ext_index(:) + !| integer, allocatable :: bnd_elem(:) + !| integer, allocatable :: ovrlap_index(:) + !| integer, allocatable :: ovrlap_elem(:,:) + !| integer, allocatable :: ovr_mst_idx(:) + !| type(psb_idxmap_type) :: idxmap + !| integer, allocatable :: lprm(:) + !| integer, allocatable :: idx_space(:) !| type(psb_desc_type), pointer :: base_desc => null() !| end type psb_desc_type ! @@ -161,9 +168,9 @@ module psb_descriptor_type ! and is only entered by the psb_cdrep call. Currently it is only ! used in the last level of some multilevel preconditioners. ! - ! The LOC_TO_GLOB, GLOB_TO_LOC, GLB_LC, HASHV and AVLTREE arrays implement the - ! mapping between local and global indices, according to the following - ! guidelines: + ! The LOC_TO_GLOB, GLOB_TO_LOC, GLB_LC, HASHV and HASH data structures + ! inside IDXMAP implement the mapping between local and global indices, + ! according to the following guidelines: ! ! 1. Each global index I is owned by at least one process; ! @@ -208,11 +215,9 @@ module psb_descriptor_type ! During the build phase glb_lc() will store the indices of the internal points, ! i.e. local indices 1:NROW, since those are known ad CDALL time. ! The halo indices that we encounter during the build phase are put in - ! a PSB_HASH_TYPE data structure, which implements a very simple hash, which will - ! nonetheless be quite fast at low occupancy rates. + ! a PSB_HASH_TYPE data structure, which implements a very simple hash; this + ! hash will nonetheless be quite fast at low occupancy rates. ! At assembly time, we move everything into hashv(:) and glb_lc(:,:). - ! Note that the desc%hash component is a pointer, but it really should be - ! an allocatable scalar. ! ! 7. The data exchange is based on lists of local indices to be exchanged; all the ! lists have the same format, as follows: @@ -294,7 +299,7 @@ module psb_descriptor_type ! 1. Allows a purely local matrix/stencil buildup phase, requiring only ! one synch point at the end (CDASB) ! 2. Takes shortcuts when the problem size is not too large (the default threshold - ! assumes that you are willing to spend up to 16 MB on each process for the + ! assumes that you are willing to spend up to 4 MB on each process for the ! glob_to_loc mapping) ! 3. Supports restriction/prolongation operators with the same routines ! just choosing (in the swapdata/swaptran internals) on which index list @@ -302,26 +307,31 @@ module psb_descriptor_type ! ! ! - type psb_desc_type - integer, allocatable :: matrix_data(:) - integer, allocatable :: halo_index(:) - integer, allocatable :: ext_index(:) - integer, allocatable :: ovrlap_index(:) - integer, allocatable :: ovrlap_elem(:,:) - integer, allocatable :: ovr_mst_idx(:) - integer, allocatable :: bnd_elem(:) + type psb_idxmap_type + integer :: state integer, allocatable :: loc_to_glob(:) - integer, allocatable :: glob_to_loc (:) + integer, allocatable :: glob_to_loc(:) integer :: hashvsize, hashvmask integer, allocatable :: hashv(:), glb_lc(:,:) - integer, allocatable :: lprm(:) - integer, allocatable :: idx_space(:) - type(psb_hash_type), pointer :: hash => null() + type(psb_hash_type) :: hash + end type psb_idxmap_type + + type psb_desc_type + integer, allocatable :: matrix_data(:) + integer, allocatable :: halo_index(:) + integer, allocatable :: ext_index(:) + integer, allocatable :: ovrlap_index(:) + integer, allocatable :: ovrlap_elem(:,:) + integer, allocatable :: ovr_mst_idx(:) + integer, allocatable :: bnd_elem(:) + type(psb_idxmap_type) :: idxmap + integer, allocatable :: lprm(:) type(psb_desc_type), pointer :: base_desc => null() + integer, allocatable :: idx_space(:) end type psb_desc_type interface psb_sizeof - module procedure psb_cd_sizeof + module procedure psb_cd_sizeof, psb_idxmap_sizeof end interface interface psb_is_ok_desc @@ -349,21 +359,42 @@ module psb_descriptor_type end interface - interface psb_cdtransfer - module procedure psb_cdtransfer + interface psb_transfer + module procedure psb_cdtransfer, psb_idxmap_transfer end interface - interface psb_cdfree - module procedure psb_cdfree + interface psb_free + module procedure psb_cdfree, psb_idxmap_free end interface + interface psb_map_l2g + module procedure psb_map_l2g_s1, psb_map_l2g_s2,& + & psb_map_l2g_v1, psb_map_l2g_v2 + end interface integer, private, save :: cd_large_threshold=psb_default_large_threshold contains + function psb_idxmap_sizeof(map) result(val) + implicit none + !....Parameters... + + Type(psb_idxmap_type), intent(in) :: map + integer(psb_long_int_k_) :: val + + val = 3*psb_sizeof_int + if (allocated(map%loc_to_glob)) val = val + psb_sizeof_int*size(map%loc_to_glob) + if (allocated(map%glob_to_loc)) val = val + psb_sizeof_int*size(map%glob_to_loc) + if (allocated(map%hashv)) val = val + psb_sizeof_int*size(map%hashv) + if (allocated(map%glb_lc)) val = val + psb_sizeof_int*size(map%glb_lc) + val = val + psb_sizeof(map%hash) + + end function psb_idxmap_sizeof + + function psb_cd_sizeof(desc) result(val) implicit none !....Parameters... @@ -371,7 +402,7 @@ contains Type(psb_desc_type), intent(in) :: desc integer(psb_long_int_k_) :: val - val = 0 + val = 0 if (allocated(desc%matrix_data)) val = val + psb_sizeof_int*size(desc%matrix_data) if (allocated(desc%halo_index)) val = val + psb_sizeof_int*size(desc%halo_index) if (allocated(desc%ext_index)) val = val + psb_sizeof_int*size(desc%ext_index) @@ -379,13 +410,9 @@ contains if (allocated(desc%ovrlap_index)) val = val + psb_sizeof_int*size(desc%ovrlap_index) if (allocated(desc%ovrlap_elem)) val = val + psb_sizeof_int*size(desc%ovrlap_elem) if (allocated(desc%ovr_mst_idx)) val = val + psb_sizeof_int*size(desc%ovr_mst_idx) - if (allocated(desc%loc_to_glob)) val = val + psb_sizeof_int*size(desc%loc_to_glob) - if (allocated(desc%glob_to_loc)) val = val + psb_sizeof_int*size(desc%glob_to_loc) - if (allocated(desc%hashv)) val = val + psb_sizeof_int*size(desc%hashv) - if (allocated(desc%glb_lc)) val = val + psb_sizeof_int*size(desc%glb_lc) if (allocated(desc%lprm)) val = val + psb_sizeof_int*size(desc%lprm) if (allocated(desc%idx_space)) val = val + psb_sizeof_int*size(desc%idx_space) - if (associated(desc%hash)) val = val + psb_sizeof(desc%hash) + val = val + psb_sizeof(desc%idxmap) end function psb_cd_sizeof @@ -425,7 +452,7 @@ contains type(psb_desc_type), intent(inout) :: desc ! We have nothing left to do here. ! Perhaps we should delete this subroutine? - nullify(desc%hash,desc%base_desc) + nullify(desc%base_desc) end subroutine psb_nullify_desc @@ -601,7 +628,7 @@ contains type(psb_desc_type), intent(in) :: desc if (allocated(desc%matrix_data)) then - psb_cd_get_size = desc%matrix_data(psb_desc_size_) + psb_cd_get_size = desc%idxmap%state else psb_cd_get_size = -1 call psb_errpush(1122,'psb_cd_get_size') @@ -733,6 +760,59 @@ contains return end subroutine psb_cd_get_list + subroutine psb_idxmap_free(map,info) + !...free descriptor structure... + use psb_const_mod + use psb_error_mod + use psb_penv_mod + implicit none + !....parameters... + type(psb_idxmap_type), intent(inout) :: map + integer, intent(out) :: info + !...locals.... + integer :: ictxt,np,me, err_act + character(len=*), parameter :: name = 'psb_idxmap_free' + + if(psb_get_errstatus() /= 0) return + info=0 + call psb_erractionsave(err_act) + + if (allocated(map%loc_to_glob)) then + deallocate(map%loc_to_glob,stat=info) + end if + if ((info == 0).and.allocated(map%glob_to_loc)) then + deallocate(map%glob_to_loc,stat=info) + end if + if ((info == 0).and.allocated(map%hashv)) then + deallocate(map%hashv,stat=info) + end if + if ((info == 0).and.allocated(map%glb_lc)) then + deallocate(map%glb_lc,stat=info) + end if + if (info /= 0) call psb_free(map%hash, info) + if (info /= 0) then + info=2052 + call psb_errpush(info,name) + goto 9999 + end if + + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_ret_) then + return + else + call psb_error(ictxt) + end if + return + + end subroutine psb_idxmap_free + + ! ! Subroutine: psb_cdfree ! Frees a descriptor data structure. @@ -775,36 +855,7 @@ contains goto 9999 endif - !...deallocate desc_a.... - if(.not.allocated(desc_a%loc_to_glob)) then - info=296 - call psb_errpush(info,name) - goto 9999 - end if - - !deallocate loc_to_glob field - deallocate(desc_a%loc_to_glob,stat=info) - if (info /= 0) then - info=2051 - call psb_errpush(info,name) - goto 9999 - end if - - if (.not.psb_is_large_desc(desc_a)) then - if (.not.allocated(desc_a%glob_to_loc)) then - info=297 - call psb_errpush(info,name) - goto 9999 - end if - - !deallocate glob_to_loc field - deallocate(desc_a%glob_to_loc,stat=info) - if (info /= 0) then - info=2052 - call psb_errpush(info,name) - goto 9999 - end if - endif + call psb_free(desc_a%idxmap,info) if (.not.allocated(desc_a%halo_index)) then info=298 @@ -873,33 +924,6 @@ contains goto 9999 end if - if (allocated(desc_a%hashv)) then - deallocate(desc_a%hashv,stat=info) - if (info /= 0) then - info=2058 - call psb_errpush(info,name) - goto 9999 - end if - end if - - if (allocated(desc_a%glb_lc)) then - deallocate(desc_a%glb_lc,stat=info) - if (info /= 0) then - info=2059 - call psb_errpush(info,name) - goto 9999 - end if - end if - - if (associated(desc_a%hash)) then - deallocate(desc_a%hash,stat=info) - if (info /= 0) then - info=2060 - call psb_errpush(info,name) - goto 9999 - end if - end if - if (allocated(desc_a%idx_space)) then deallocate(desc_a%idx_space,stat=info) if (info /= 0) then @@ -988,20 +1012,138 @@ contains & call psb_transfer( desc_in%ovr_mst_idx , desc_out%ovr_mst_idx , info) if (info == 0) & & call psb_transfer( desc_in%ext_index , desc_out%ext_index , info) - if (info == 0) & - & call psb_transfer( desc_in%loc_to_glob , desc_out%loc_to_glob , info) - if (info == 0) & - & call psb_transfer( desc_in%glob_to_loc , desc_out%glob_to_loc , info) if (info == 0) & & call psb_transfer( desc_in%lprm , desc_out%lprm , info) if (info == 0) & & call psb_transfer( desc_in%idx_space , desc_out%idx_space , info) + if (info == 0) & + & call psb_transfer(desc_in%idxmap, desc_out%idxmap,info) + if (info /= 0) then + info = 4010 + call psb_errpush(info,name) + goto 9999 + endif + if (debug_level >= psb_debug_ext_) & + & write(debug_unit,*) me,' ',trim(name),': end' + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_ret_) then + return + else + call psb_error(ictxt) + end if + return + + end subroutine psb_cdtransfer + + subroutine psb_idxmap_transfer(map_in, map_out, info) + + use psb_realloc_mod + use psb_const_mod + use psb_error_mod + use psb_penv_mod + + implicit none + !....parameters... + + type(psb_idxmap_type), intent(inout) :: map_in + type(psb_idxmap_type), intent(inout) :: map_out + integer, intent(out) :: info + + !locals + integer :: np,me,ictxt, err_act + integer :: debug_level, debug_unit + character(len=*), parameter :: name = 'psb_idxmap_transfer' + + if (psb_get_errstatus()/=0) return + info = 0 + call psb_erractionsave(err_act) + + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + map_out%state = map_in%state + map_out%hashvsize = map_in%hashvsize + map_out%hashvmask = map_in%hashvmask + + if (info == 0) & + & call psb_transfer( map_in%loc_to_glob , map_out%loc_to_glob , info) + if (info == 0) & + & call psb_transfer( map_in%glob_to_loc , map_out%glob_to_loc , info) if (info == 0) & - & call psb_transfer( desc_in%hashv , desc_out%hashv , info) + & call psb_transfer( map_in%hashv , map_out%hashv , info) if (info == 0) & - & call psb_transfer( desc_in%glb_lc , desc_out%glb_lc , info) + & call psb_transfer( map_in%glb_lc , map_out%glb_lc , info) + if (info == 0) & + & call psb_transfer( map_in%hash , map_out%hash , info) + + if (info /= 0) then + info = 4010 + call psb_errpush(info,name) + goto 9999 + endif + if (debug_level >= psb_debug_ext_) & + & write(debug_unit,*) me,' ',trim(name),': end' + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_ret_) then + return + else + call psb_error() + end if + return + + end subroutine psb_idxmap_transfer + + subroutine psb_idxmap_copy(map_in, map_out, info) + + use psb_realloc_mod + use psb_const_mod + use psb_error_mod + use psb_penv_mod + + implicit none + !....parameters... + + type(psb_idxmap_type), intent(in) :: map_in + type(psb_idxmap_type), intent(inout) :: map_out + integer, intent(out) :: info + + !locals + integer :: np,me,ictxt, err_act + integer :: debug_level, debug_unit + character(len=*), parameter :: name = 'psb_idxmap_transfer' - desc_out%hash => desc_in%hash; nullify(desc_in%hash) + if (psb_get_errstatus()/=0) return + info = 0 + call psb_erractionsave(err_act) + + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + map_out%state = map_in%state + map_out%hashvsize = map_in%hashvsize + map_out%hashvmask = map_in%hashvmask + + call psb_safe_ab_cpy( map_in%loc_to_glob , map_out%loc_to_glob , info) + if (info == 0) & + & call psb_safe_ab_cpy( map_in%glob_to_loc , map_out%glob_to_loc , info) + if (info == 0) & + & call psb_safe_ab_cpy( map_in%hashv , map_out%hashv , info) + if (info == 0) & + & call psb_safe_ab_cpy( map_in%glb_lc , map_out%glb_lc , info) + if (info == 0) & + & call psb_hash_copy( map_in%hash , map_out%hash , info) if (info /= 0) then info = 4010 @@ -1020,12 +1162,111 @@ contains if (err_act == psb_act_ret_) then return else - call psb_error(ictxt) + call psb_error() end if return - end subroutine psb_cdtransfer + end subroutine psb_idxmap_copy + + subroutine psb_map_l2g_s1(idx,map,info) + implicit none + integer, intent(inout) :: idx + integer, intent(out) :: info + type(psb_idxmap_type) :: map + integer :: nc + info = 0 + if (.not.allocated(map%loc_to_glob)) then + info = 140 + idx = -1 + return + end if + nc = size(map%loc_to_glob) + if ((idx < 1).or.(idx>nc)) then + info = 140 + idx = -1 + return + end if + idx = map%loc_to_glob(idx) + + end subroutine psb_map_l2g_s1 + + subroutine psb_map_l2g_s2(idx,gidx,map,info) + implicit none + integer, intent(in) :: idx + integer, intent(out) :: gidx, info + type(psb_idxmap_type) :: map + integer :: nc + + info = 0 + if (.not.allocated(map%loc_to_glob)) then + info = 140 + gidx = -1 + return + end if + nc = size(map%loc_to_glob) + if ((idx < 1).or.(idx>nc)) then + info = 140 + gidx = -1 + return + end if + gidx = map%loc_to_glob(idx) + + end subroutine psb_map_l2g_s2 + + subroutine psb_map_l2g_v1(idx,map,info) + implicit none + integer, intent(inout) :: idx(:) + integer, intent(out) :: info + type(psb_idxmap_type) :: map + integer :: nc, i, ix + + info = 0 + if (.not.allocated(map%loc_to_glob)) then + info = 140 + idx = -1 + return + end if + nc = size(map%loc_to_glob) + do i=1, size(idx) + ix = idx(i) + if ((ix < 1).or.(ix>nc)) then + info = 140 + idx(i) = -1 + else + idx(i) = map%loc_to_glob(ix) + end if + end do + + end subroutine psb_map_l2g_v1 + + subroutine psb_map_l2g_v2(idx,gidx,map,info) + implicit none + integer, intent(in) :: idx(:) + integer, intent(out) :: gidx(:),info + type(psb_idxmap_type) :: map + integer :: nc, i, ix + + info = 0 + if ((.not.allocated(map%loc_to_glob)).or.& + & (size(gidx)nc)) then + info = 140 + gidx(i) = -1 + else + gidx(i) = map%loc_to_glob(ix) + end if + end do + + end subroutine psb_map_l2g_v2 Subroutine psb_cd_get_recv_idx(tmp,desc,data,info,toglob) @@ -1098,12 +1339,12 @@ contains goto 9999 end if if (toglob) then - If(idx > Size(desc%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 endif - gidx = desc%loc_to_glob(idx) tmp(outcnt) = proc tmp(outcnt+1) = 1 tmp(outcnt+2) = gidx @@ -1114,7 +1355,7 @@ contains tmp(outcnt+2) = idx tmp(outcnt+3) = -1 end if - outcnt = outcnt+3 + outcnt = outcnt+3 end Do incnt = incnt+n_elem_recv+n_elem_send+3 end Do diff --git a/base/modules/psb_hash_mod.f90 b/base/modules/psb_hash_mod.f90 index 719b225c..da8a1efb 100644 --- a/base/modules/psb_hash_mod.f90 +++ b/base/modules/psb_hash_mod.f90 @@ -38,8 +38,10 @@ ! (See Knuth: TAOCP, Vol. 3, sec. 6.4) ! These hash functions are not very smart; however they are very simple and fast. ! The intended usage of this hash table is to store indices of halo points, which -! are supposed to be few compared to the internal indices -! (which are stored elsewhere), so in this context they are acceptable. +! are supposed to be few compared to the internal indices (which are stored elsewhere). +! Therefore, either the table has a very low occupancy, and this scheme will work, +! or we have lots more to worry about in parallel performance than the efficiency +! of this hashing scheme. ! ! ! @@ -64,10 +66,24 @@ module psb_hash_mod interface psb_hash_init module procedure psb_hash_init_v, psb_hash_init_n end interface + interface psb_sizeof module procedure psb_sizeof_hash_type end interface + + interface psb_transfer + module procedure HashTransfer + end interface + + interface psb_hash_copy + module procedure HashCopy + end interface + + interface psb_free + module procedure HashFree + end interface + contains @@ -90,12 +106,12 @@ contains function psb_Sizeof_hash_type(hash) result(val) - type(psb_hash_type), pointer :: hash + type(psb_hash_type) :: hash integer(psb_long_int_k_) :: val - val = 0 - if (associated(hash)) then - val = val + psb_sizeof_int * size(hash%table) - end if + val = 4*psb_sizeof_int + 2*psb_sizeof_long_int + if (allocated(hash%table)) & + & val = val + psb_sizeof_int * size(hash%table) + end function psb_Sizeof_hash_type @@ -106,6 +122,20 @@ contains psb_hash_avg_acc = dble(hash%nacc)/dble(hash%nsrch) end function psb_hash_avg_acc + subroutine HashFree(hashin,info) + use psb_realloc_mod + type(psb_hash_type) :: hashin + + info = 0 + if (allocated(hashin%table)) then + deallocate(hashin%table,stat=info) + end if + hashin%nbits = 0 + hashin%hsize = 0 + hashin%hmask = 0 + hashin%nk = 0 + end subroutine HashFree + subroutine HashTransfer(hashin,hashout,info) use psb_realloc_mod type(psb_hash_type) :: hashin @@ -310,6 +340,10 @@ contains integer :: i,j,k,hsize,hmask, hk, hd info = HashOK + if (.not.allocated(hash%table) ) then + val = HashFreeEntry + return + end if hsize = hash%hsize hmask = hash%hmask hk = iand(hashval(key),hmask) diff --git a/base/modules/psi_mod.f90 b/base/modules/psi_mod.f90 index 0bde09ca..486a782a 100644 --- a/base/modules/psi_mod.f90 +++ b/base/modules/psi_mod.f90 @@ -473,12 +473,20 @@ module psi_mod end subroutine psi_ldsc_pre_halo end interface - interface psi_bld_hash - subroutine psi_bld_hash(desc,info) +!!$ interface psi_bld_hash +!!$ subroutine psi_bld_hash(desc,info) +!!$ use psb_descriptor_type, only : psb_desc_type, psb_spk_, psb_dpk_ +!!$ type(psb_desc_type), intent(inout) :: desc +!!$ integer, intent(out) :: info +!!$ end subroutine psi_bld_hash +!!$ end interface + + interface psi_bld_g2lmap + subroutine psi_bld_g2lmap(desc,info) use psb_descriptor_type, only : psb_desc_type, psb_spk_, psb_dpk_ type(psb_desc_type), intent(inout) :: desc integer, intent(out) :: info - end subroutine psi_bld_hash + end subroutine psi_bld_g2lmap end interface interface psi_bld_tmphalo diff --git a/base/tools/psb_ccdbldext.F90 b/base/tools/psb_ccdbldext.F90 index 1f49296c..bdbb08d2 100644 --- a/base/tools/psb_ccdbldext.F90 +++ b/base/tools/psb_ccdbldext.F90 @@ -256,21 +256,18 @@ Subroutine psb_ccdbldext(a,desc_a,novr,desc_ov,info, extype) Do j=0,n_elem_recv-1 idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 endif - - gidx = desc_ov%loc_to_glob(idx) - call psb_ensure_size((cntov_o+3),orig_ovr,info,pad=-1) if (info /= 0) then info=4010 call psb_errpush(info,name,a_err='psb_ensure_size') goto 9999 end if - orig_ovr(cntov_o)=proc orig_ovr(cntov_o+1)=1 orig_ovr(cntov_o+2)=gidx @@ -353,21 +350,19 @@ Subroutine psb_ccdbldext(a,desc_a,novr,desc_ov,info, extype) ! add recv elements in halo_index into ovrlap_index ! Do j=0,n_elem_recv-1 - If((counter+psb_elem_recv_+j)>Size(halo)) then + If ((counter+psb_elem_recv_+j)>Size(halo)) then info=-2 call psb_errpush(info,name) goto 9999 end If idx = halo(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 - endif - - gidx = desc_ov%loc_to_glob(idx) - + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -405,12 +400,19 @@ Subroutine psb_ccdbldext(a,desc_a,novr,desc_ov,info, extype) ! Do j=0,n_elem_send-1 - idx = halo(counter+psb_elem_send_+j) - gidx = desc_ov%loc_to_glob(idx) - if (idx > psb_cd_get_local_rows(Desc_a)) & - & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& - & idx,psb_cd_get_local_rows(Desc_a) +!!$ idx = halo(counter+psb_elem_send_+j) +!!$ gidx = desc_ov%loc_to_glob(idx) +!!$ if (idx > psb_cd_get_local_rows(Desc_a)) & +!!$ & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& +!!$ & idx,psb_cd_get_local_rows(Desc_a) + idx = halo(counter+psb_elem_send_+j) + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then + info=-3 + call psb_errpush(info,name) + goto 9999 + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -456,9 +458,9 @@ Subroutine psb_ccdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - Do jj=1,n_elem - works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) - End Do + call psb_map_l2g(blk%ia2(1:n_elem),& + & works(idxs+tot_elem+1:idxs+tot_elem+n_elem),& + & desc_ov%idxmap,info) tot_elem=tot_elem+n_elem End If @@ -470,17 +472,12 @@ Subroutine psb_ccdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_msort_unique(works(idxs+1:idxs+tot_elem),i) tot_elem=i endif - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) sdsz(proc+1) = tot_elem idxs = idxs + tot_elem end if counter = counter+n_elem_send+3 - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop End',tmp_ovr_idx(1:10) + Enddo if (debug_level >= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& @@ -601,23 +598,23 @@ Subroutine psb_ccdbldext(a,desc_a,novr,desc_ov,info, extype) idx=workr(i) if (idx <1) then write(0,*) me,'Error in CDBLDEXTBLD level',i_ovr,' : ',idx,i,iszr - else If (desc_ov%glob_to_loc(idx) < -np) Then + else If (desc_ov%idxmap%glob_to_loc(idx) < -np) Then ! ! This is a new index. Assigning a local index as ! we receive them guarantees that all indices for HALO(I) ! will be less than those for HALO(J) whenever I= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& & ':Wrong input to cdbldextbld?',& - & idx,desc_ov%glob_to_loc(idx) + & idx,desc_ov%idxmap%glob_to_loc(idx) End If End Do desc_ov%matrix_data(psb_n_col_) = n_col diff --git a/base/tools/psb_cd_inloc.f90 b/base/tools/psb_cd_inloc.f90 index 6e92dcf0..55f659a2 100644 --- a/base/tools/psb_cd_inloc.f90 +++ b/base/tools/psb_cd_inloc.f90 @@ -288,16 +288,16 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) if (info == 0) then desc%lprm(1) = 0 desc%matrix_data(:) = 0 - desc%matrix_data(psb_desc_size_) = psb_desc_large_ + desc%idxmap%state = psb_desc_large_ end if else - allocate(desc%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& + allocate(desc%idxmap%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& &temp_ovrlap(2*loc_row),desc%lprm(1),& & stat=info) if (info == 0) then desc%lprm(1) = 0 desc%matrix_data(:) = 0 - desc%matrix_data(psb_desc_size_) = psb_desc_normal_ + desc%idxmap%state = psb_desc_normal_ end if end if if (info /= 0) then @@ -310,14 +310,14 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) ! estimate local cols number loc_col = min(2*loc_row,m) - allocate(desc%loc_to_glob(loc_col),stat=info) + allocate(desc%idxmap%loc_to_glob(loc_col),stat=info) if (info /= 0) then info=4025 int_err(1)=loc_col call psb_errpush(info,name,i_err=int_err,a_err='integer') goto 9999 end if - desc%loc_to_glob(:) = -1 + desc%idxmap%loc_to_glob(:) = -1 temp_ovrlap(:) = -1 desc%matrix_data(psb_m_) = m desc%matrix_data(psb_n_) = n @@ -355,7 +355,7 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) itmpov = 0 do k=1, loc_row i = vl(k) - desc%loc_to_glob(k) = i + desc%idxmap%loc_to_glob(k) = i if (check_) then nprocs = tmpgidx(i,2) @@ -414,7 +414,7 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) exit end if - desc%glob_to_loc(i) = -(np+(tmpgidx(i,1)-flag_)+1) + desc%idxmap%glob_to_loc(i) = -(np+(tmpgidx(i,1)-flag_)+1) enddo if (info /= 0) then @@ -427,8 +427,8 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck) itmpov = 0 do k=1, loc_row i = vl(k) - desc%loc_to_glob(k) = i - desc%glob_to_loc(i) = k + desc%idxmap%loc_to_glob(k) = i + desc%idxmap%glob_to_loc(i) = k nprocs = tmpgidx(i,2) if (nprocs > 1) then diff --git a/base/tools/psb_cd_set_bld.f90 b/base/tools/psb_cd_set_bld.f90 index f402d18f..417604fc 100644 --- a/base/tools/psb_cd_set_bld.f90 +++ b/base/tools/psb_cd_set_bld.f90 @@ -96,10 +96,9 @@ subroutine psb_cd_set_bld(desc,info) ! the hash occupancy. ! nc = psb_cd_get_local_cols(desc) - if (.not.associated(desc%hash)) allocate(desc%hash,stat=info) if (info == 0)& - & call psb_hash_init(nc,desc%hash,info) - if (info == 0) call psi_bld_hash(desc,info) + & call psb_hash_init(nc,desc%idxmap%hash,info) + if (info == 0) call psi_bld_g2lmap(desc,info) if (info /= 0) then call psb_errpush(4010,name,a_err='hashInit') goto 9999 diff --git a/base/tools/psb_cdals.f90 b/base/tools/psb_cdals.f90 index 9da543f8..80ce72f5 100644 --- a/base/tools/psb_cdals.f90 +++ b/base/tools/psb_cdals.f90 @@ -132,14 +132,14 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) & temp_ovrlap(2*loc_row),prc_v(np),stat=info) if (info == 0) then desc%matrix_data(:) = 0 - desc%matrix_data(psb_desc_size_) = psb_desc_large_ + desc%idxmap%state = psb_desc_large_ end if else - allocate(desc%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& + allocate(desc%idxmap%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& & temp_ovrlap(2*loc_row),prc_v(np),stat=info) if (info == 0) then desc%matrix_data(:) = 0 - desc%matrix_data(psb_desc_size_) = psb_desc_normal_ + desc%idxmap%state = psb_desc_normal_ end if end if if (info /= 0) then @@ -176,7 +176,7 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) ! loc_col = max(1,(m+np-1)/np) loc_col = min(2*loc_col,m) - allocate(desc%loc_to_glob(loc_col), desc%lprm(1),& + allocate(desc%idxmap%loc_to_glob(loc_col), desc%lprm(1),& & stat=info) if (info /= 0) then info=4025 @@ -187,7 +187,7 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) ! set LOC_TO_GLOB array to all "-1" values desc%lprm(1) = 0 - desc%loc_to_glob(:) = -1 + desc%idxmap%loc_to_glob(:) = -1 k = 0 do i=1,m if (info == 0) then @@ -233,13 +233,13 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) if (prc_v(j) == me) then ! this point belongs to me k = k + 1 - call psb_ensure_size((k+1),desc%loc_to_glob,info,pad=-1) + call psb_ensure_size((k+1),desc%idxmap%loc_to_glob,info,pad=-1) if (info /= 0) then info=4010 call psb_errpush(info,name,a_err='psb_ensure_size') goto 9999 end if - desc%loc_to_glob(k) = i + desc%idxmap%loc_to_glob(k) = i if (nprocs > 1) then call psb_ensure_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1) if (info /= 0) then @@ -310,7 +310,7 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) end if end do endif - desc%glob_to_loc(i) = -(np+prc_v(1)+1) + desc%idxmap%glob_to_loc(i) = -(np+prc_v(1)+1) j=1 do if (j > nprocs) exit @@ -321,7 +321,7 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) if (prc_v(j) == me) then ! this point belongs to me counter=counter+1 - desc%glob_to_loc(i) = counter + desc%idxmap%glob_to_loc(i) = counter if (nprocs > 1) then call psb_ensure_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1) if (info /= 0) then @@ -344,7 +344,7 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) loc_row=counter loc_col=min(2*loc_row,m) - allocate(desc%loc_to_glob(loc_col),& + allocate(desc%idxmap%loc_to_glob(loc_col),& &desc%lprm(1),stat=info) if (info /= 0) then call psb_errpush(4010,name,a_err='Allocate') @@ -353,11 +353,11 @@ subroutine psb_cdals(m, n, parts, ictxt, desc, info) ! set LOC_TO_GLOB array to all "-1" values desc%lprm(1) = 0 - desc%loc_to_glob(:) = -1 + desc%idxmap%loc_to_glob(:) = -1 do i=1,m - k = desc%glob_to_loc(i) + k = desc%idxmap%glob_to_loc(i) if (k > 0) then - desc%loc_to_glob(k) = i + desc%idxmap%loc_to_glob(k) = i endif enddo diff --git a/base/tools/psb_cdalv.f90 b/base/tools/psb_cdalv.f90 index 0965cfaf..ed21e48a 100644 --- a/base/tools/psb_cdalv.f90 +++ b/base/tools/psb_cdalv.f90 @@ -143,14 +143,14 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) &temp_ovrlap(2*loc_row),stat=info) if (info == 0) then desc%matrix_data(:) = 0 - desc%matrix_data(psb_desc_size_) = psb_desc_large_ + desc%idxmap%state = psb_desc_large_ end if else - allocate(desc%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& + allocate(desc%idxmap%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& &temp_ovrlap(2*loc_row),stat=info) if (info == 0) then desc%matrix_data(:) = 0 - desc%matrix_data(psb_desc_size_) = psb_desc_normal_ + desc%idxmap%state = psb_desc_normal_ end if end if if (info /= 0) then @@ -217,7 +217,7 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) ! estimate local cols number loc_col = min(2*loc_row,m) - allocate(desc%loc_to_glob(loc_col), desc%lprm(1),& + allocate(desc%idxmap%loc_to_glob(loc_col), desc%lprm(1),& & stat=info) if (info /= 0) then info=4025 @@ -228,12 +228,12 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) ! set LOC_TO_GLOB array to all "-1" values desc%lprm(1) = 0 - desc%loc_to_glob(:) = -1 + desc%idxmap%loc_to_glob(:) = -1 k = 0 do i=1,m if ((v(i)-flag_) == me) then k = k + 1 - desc%loc_to_glob(k) = i + desc%idxmap%loc_to_glob(k) = i endif enddo @@ -262,9 +262,9 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) if ((v(i)-flag_) == me) then ! this point belongs to me counter=counter+1 - desc%glob_to_loc(i) = counter + desc%idxmap%glob_to_loc(i) = counter else - desc%glob_to_loc(i) = -(np+(v(i)-flag_)+1) + desc%idxmap%glob_to_loc(i) = -(np+(v(i)-flag_)+1) end if enddo @@ -284,7 +284,7 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) ! estimate local cols number loc_col = min(2*loc_row,m) - allocate(desc%loc_to_glob(loc_col),& + allocate(desc%idxmap%loc_to_glob(loc_col),& &desc%lprm(1),stat=info) if (info /= 0) then info=4025 @@ -295,11 +295,11 @@ subroutine psb_cdalv(v, ictxt, desc, info, flag) ! set LOC_TO_GLOB array to all "-1" values desc%lprm(1) = 0 - desc%loc_to_glob(:) = -1 + desc%idxmap%loc_to_glob(:) = -1 do i=1,m - k = desc%glob_to_loc(i) + k = desc%idxmap%glob_to_loc(i) if (k > 0) then - desc%loc_to_glob(k) = i + desc%idxmap%loc_to_glob(k) = i endif enddo diff --git a/base/tools/psb_cdcpy.f90 b/base/tools/psb_cdcpy.f90 index 2c0fc329..4717e0de 100644 --- a/base/tools/psb_cdcpy.f90 +++ b/base/tools/psb_cdcpy.f90 @@ -74,21 +74,22 @@ subroutine psb_cdcpy(desc_in, desc_out, info) endif call psb_safe_ab_cpy(desc_in%matrix_data,desc_out%matrix_data,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%halo_index,desc_out%halo_index,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%ext_index,desc_out%ext_index,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%ovrlap_index,desc_out%ovrlap_index,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%bnd_elem,desc_out%bnd_elem,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%ovrlap_elem,desc_out%ovrlap_elem,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%ovr_mst_idx,desc_out%ovr_mst_idx,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%loc_to_glob,desc_out%loc_to_glob,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%glob_to_loc,desc_out%glob_to_loc,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%lprm,desc_out%lprm,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%idx_space,desc_out%idx_space,info) - desc_out%hashvsize = desc_in%hashvsize - desc_out%hashvmask = desc_in%hashvmask - if (info == 0) call psb_safe_ab_cpy(desc_in%hashv,desc_out%hashv,info) - if (info == 0) call psb_safe_ab_cpy(desc_in%glb_lc,desc_out%glb_lc,info) - if (info == 0) call CloneHashTable(desc_in%hash,desc_out%hash,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%halo_index,desc_out%halo_index,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%ext_index,desc_out%ext_index,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%ovrlap_index,desc_out%ovrlap_index,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%bnd_elem,desc_out%bnd_elem,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%ovrlap_elem,desc_out%ovrlap_elem,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%ovr_mst_idx,desc_out%ovr_mst_idx,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%lprm,desc_out%lprm,info) + if (info == 0) call psb_safe_ab_cpy(desc_in%idx_space,desc_out%idx_space,info) + if (info == 0) call psb_idxmap_copy(desc_in%idxmap,desc_out%idxmap, info) +!!$ if (info == 0) call psb_safe_ab_cpy(desc_in%loc_to_glob,desc_out%loc_to_glob,info) +!!$ if (info == 0) call psb_safe_ab_cpy(desc_in%glob_to_loc,desc_out%glob_to_loc,info) +!!$ desc_out%hashvsize = desc_in%hashvsize +!!$ desc_out%hashvmask = desc_in%hashvmask +!!$ if (info == 0) call psb_safe_ab_cpy(desc_in%hashv,desc_out%hashv,info) +!!$ if (info == 0) call psb_safe_ab_cpy(desc_in%glb_lc,desc_out%glb_lc,info) +!!$ if (info == 0) call CloneHashTable(desc_in%hash,desc_out%hash,info) if (info /= 0) then info = 4010 diff --git a/base/tools/psb_cdprt.f90 b/base/tools/psb_cdprt.f90 index 044ad0a6..33e30b52 100644 --- a/base/tools/psb_cdprt.f90 +++ b/base/tools/psb_cdprt.f90 @@ -69,10 +69,10 @@ subroutine psb_cdprt(iout,desc_p,glob,short) n_row=desc_p%matrix_data(psb_n_row_) n_col=desc_p%matrix_data(psb_n_col_) if (.not.lshort) & - & write(iout,*) 'Loc_to_glob ',desc_p%loc_to_glob(1:n_row), ': ',& - & desc_p%loc_to_glob(n_row+1:n_col) + & write(iout,*) 'Loc_to_glob ',desc_p%idxmap%loc_to_glob(1:n_row), ': ',& + & desc_p%idxmap%loc_to_glob(n_row+1:n_col) -!!$ if (.not.lshort) write(iout,*) 'glob_to_loc ',desc_p%glob_to_loc(1:m) +!!$ if (.not.lshort) write(iout,*) 'glob_to_loc ',desc_p%idxmap%glob_to_loc(1:m) write(iout,*) 'Halo_index' counter = 1 Do @@ -147,16 +147,16 @@ subroutine psb_cdprt(iout,desc_p,glob,short) if (.not.lshort) then write(iout,*) 'Loc_to_glob ' do i=1, n_row - write(iout,*) i, desc_p%loc_to_glob(i) + write(iout,*) i, desc_p%idxmap%loc_to_glob(i) enddo write(iout,*) '........' do i=n_row+1,n_col - write(iout,*) i, desc_p%loc_to_glob(i) + write(iout,*) i, desc_p%idxmap%loc_to_glob(i) enddo !!$ write(iout,*) 'glob_to_loc ' !!$ do i=1,m -!!$ write(iout,*) i,desc_p%glob_to_loc(i) +!!$ write(iout,*) i,desc_p%idxmap%glob_to_loc(i) !!$ enddo endif write(iout,*) 'Halo_index' @@ -170,7 +170,7 @@ subroutine psb_cdprt(iout,desc_p,glob,short) if (.not.lshort) then do i=counter+psb_n_elem_recv_+1,counter+psb_n_elem_recv_+n_elem_recv write(iout,*) & - & desc_p%loc_to_glob(desc_p%halo_index(i)),desc_p%halo_index(i) + & desc_p%idxmap%loc_to_glob(desc_p%halo_index(i)),desc_p%halo_index(i) enddo endif write(iout,*) 'Halo_index Send',proc,n_elem_send @@ -178,7 +178,7 @@ subroutine psb_cdprt(iout,desc_p,glob,short) do i=counter+n_elem_recv+psb_n_elem_send_+1, & & counter+n_elem_recv+psb_n_elem_send_+n_elem_send write(iout,*) & - & desc_p%loc_to_glob(desc_p%halo_index(i)), desc_p%halo_index(i) + & desc_p%idxmap%loc_to_glob(desc_p%halo_index(i)), desc_p%halo_index(i) enddo endif counter = counter+n_elem_recv+n_elem_send+3 @@ -195,7 +195,7 @@ subroutine psb_cdprt(iout,desc_p,glob,short) if (.not.lshort) then do i=counter+psb_n_elem_recv_+1,counter+psb_n_elem_recv_+n_elem_recv write(iout,*) & - & desc_p%loc_to_glob(desc_p%ext_index(i)),desc_p%ext_index(i) + & desc_p%idxmap%loc_to_glob(desc_p%ext_index(i)),desc_p%ext_index(i) enddo endif write(iout,*) 'Ext_index Send',proc,n_elem_send @@ -203,7 +203,7 @@ subroutine psb_cdprt(iout,desc_p,glob,short) do i=counter+n_elem_recv+psb_n_elem_send_+1, & & counter+n_elem_recv+psb_n_elem_send_+n_elem_send write(iout,*) & - & desc_p%loc_to_glob(desc_p%ext_index(i)), desc_p%ext_index(i) + & desc_p%idxmap%loc_to_glob(desc_p%ext_index(i)), desc_p%ext_index(i) enddo endif counter = counter+n_elem_recv+n_elem_send+3 @@ -220,7 +220,7 @@ subroutine psb_cdprt(iout,desc_p,glob,short) write(iout,*) 'Ovrlap_index Receive',proc,n_elem_recv if (.not.lshort) then do i=counter+psb_n_elem_recv_+1,counter+psb_n_elem_recv_+n_elem_recv - write(iout,*) desc_p%loc_to_glob(desc_p%ovrlap_index(i)),& + write(iout,*) desc_p%idxmap%loc_to_glob(desc_p%ovrlap_index(i)),& & desc_p%ovrlap_index(i) enddo endif @@ -228,7 +228,7 @@ subroutine psb_cdprt(iout,desc_p,glob,short) if (.not.lshort) then do i=counter+n_elem_recv+psb_n_elem_send_+1, & & counter+n_elem_recv+psb_n_elem_send_+n_elem_send - write(iout,*) desc_p%loc_to_glob(desc_p%ovrlap_index(i)),& + write(iout,*) desc_p%idxmap%loc_to_glob(desc_p%ovrlap_index(i)),& & desc_p%ovrlap_index(i) enddo endif @@ -241,7 +241,7 @@ subroutine psb_cdprt(iout,desc_p,glob,short) idx = desc_p%ovrlap_elem(counter,1) n_elem_recv = desc_p%ovrlap_elem(counter,2) proc = desc_p%ovrlap_elem(counter,3) - if (.not.lshort) write(iout,*) idx,desc_p%loc_to_glob(idx),n_elem_Recv,proc + if (.not.lshort) write(iout,*) idx,desc_p%idxmap%loc_to_glob(idx),n_elem_Recv,proc enddo end if diff --git a/base/tools/psb_cdren.f90 b/base/tools/psb_cdren.f90 index 697b35ea..620c3061 100644 --- a/base/tools/psb_cdren.f90 +++ b/base/tools/psb_cdren.f90 @@ -137,14 +137,14 @@ subroutine psb_cdren(trans,iperm,desc_a,info) if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': renumbering glob_to_loc' do i=1, n_col - desc_a%glob_to_loc(desc_a%loc_to_glob(desc_a%lprm(i))) = i + desc_a%idxmap%glob_to_loc(desc_a%idxmap%loc_to_glob(desc_a%lprm(i))) = i enddo if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': renumbering loc_to_glob' do i=1,psb_cd_get_global_rows(desc_a) - j = desc_a%glob_to_loc(i) + j = desc_a%idxmap%glob_to_loc(i) if (j>0) then - desc_a%loc_to_glob(j) = i + desc_a%idxmap%loc_to_glob(j) = i endif enddo if (debug_level >= psb_debug_ext_) & diff --git a/base/tools/psb_cdrep.f90 b/base/tools/psb_cdrep.f90 index 9b779db3..9ca610ed 100644 --- a/base/tools/psb_cdrep.f90 +++ b/base/tools/psb_cdrep.f90 @@ -179,8 +179,8 @@ subroutine psb_cdrep(m, ictxt, desc, info) !count local rows number ! allocate work vector - allocate(desc%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& - & desc%loc_to_glob(m),desc%lprm(1),& + allocate(desc%idxmap%glob_to_loc(m),desc%matrix_data(psb_mdata_size_),& + & desc%idxmap%loc_to_glob(m),desc%lprm(1),& & desc%ovrlap_elem(0,3),stat=info) if (info /= 0) then info=4025 @@ -190,7 +190,7 @@ subroutine psb_cdrep(m, ictxt, desc, info) endif ! If the index space is replicated there's no point in not having ! the full map on the current process. - desc%matrix_data(psb_desc_size_) = psb_desc_normal_ + desc%idxmap%state = psb_desc_normal_ desc%matrix_data(psb_m_) = m @@ -202,8 +202,8 @@ subroutine psb_cdrep(m, ictxt, desc, info) desc%matrix_data(psb_dec_type_) = psb_desc_bld_ do i=1,m - desc%glob_to_loc(i) = i - desc%loc_to_glob(i) = i + desc%idxmap%glob_to_loc(i) = i + desc%idxmap%loc_to_glob(i) = i enddo tovr = -1 diff --git a/base/tools/psb_cspins.f90 b/base/tools/psb_cspins.f90 index 51689e61..b5a6c54c 100644 --- a/base/tools/psb_cspins.f90 +++ b/base/tools/psb_cspins.f90 @@ -167,7 +167,7 @@ subroutine psb_cspins(nz,ia,ja,val,a,desc_a,info,rebuild) ncol = psb_cd_get_local_cols(desc_a) if (spstate == psb_spmat_bld_) then - call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%glob_to_loc) + call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%idxmap%glob_to_loc) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -213,7 +213,7 @@ subroutine psb_cspins(nz,ia,ja,val,a,desc_a,info,rebuild) nrow = psb_cd_get_local_rows(desc_a) ncol = psb_cd_get_local_cols(desc_a) call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& - & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) + & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -374,7 +374,7 @@ subroutine psb_cspins_2desc(nz,ia,ja,val,a,desc_ar,desc_ac,info) !!$ nrow = psb_cd_get_local_rows(desc_a) !!$ ncol = psb_cd_get_local_cols(desc_a) !!$ call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& -!!$ & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) +!!$ & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) !!$ if (info /= 0) then !!$ info=4010 !!$ ch_err='psb_coins' diff --git a/base/tools/psb_dcdbldext.F90 b/base/tools/psb_dcdbldext.F90 index d23b1b0b..d3fbdbf0 100644 --- a/base/tools/psb_dcdbldext.F90 +++ b/base/tools/psb_dcdbldext.F90 @@ -255,21 +255,18 @@ Subroutine psb_dcdbldext(a,desc_a,novr,desc_ov,info, extype) Do j=0,n_elem_recv-1 idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 endif - - gidx = desc_ov%loc_to_glob(idx) - call psb_ensure_size((cntov_o+3),orig_ovr,info,pad=-1) if (info /= 0) then info=4010 call psb_errpush(info,name,a_err='psb_ensure_size') goto 9999 end if - orig_ovr(cntov_o)=proc orig_ovr(cntov_o+1)=1 orig_ovr(cntov_o+2)=gidx @@ -359,14 +356,12 @@ Subroutine psb_dcdbldext(a,desc_a,novr,desc_ov,info, extype) end If idx = halo(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 - endif - - gidx = desc_ov%loc_to_glob(idx) - + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -404,12 +399,19 @@ Subroutine psb_dcdbldext(a,desc_a,novr,desc_ov,info, extype) ! Do j=0,n_elem_send-1 - idx = halo(counter+psb_elem_send_+j) - gidx = desc_ov%loc_to_glob(idx) - if (idx > psb_cd_get_local_rows(Desc_a)) & - & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& - & idx,psb_cd_get_local_rows(Desc_a) +!!$ idx = halo(counter+psb_elem_send_+j) +!!$ gidx = desc_ov%loc_to_glob(idx) +!!$ if (idx > psb_cd_get_local_rows(Desc_a)) & +!!$ & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& +!!$ & idx,psb_cd_get_local_rows(Desc_a) + idx = halo(counter+psb_elem_send_+j) + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then + info=-3 + call psb_errpush(info,name) + goto 9999 + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -455,9 +457,9 @@ Subroutine psb_dcdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - Do jj=1,n_elem - works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) - End Do + call psb_map_l2g(blk%ia2(1:n_elem),& + & works(idxs+tot_elem+1:idxs+tot_elem+n_elem),& + & desc_ov%idxmap,info) tot_elem=tot_elem+n_elem End If @@ -469,17 +471,12 @@ Subroutine psb_dcdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_msort_unique(works(idxs+1:idxs+tot_elem),i) tot_elem=i endif - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) sdsz(proc+1) = tot_elem idxs = idxs + tot_elem end if counter = counter+n_elem_send+3 - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop End',tmp_ovr_idx(1:10) + Enddo if (debug_level >= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& @@ -600,23 +597,23 @@ Subroutine psb_dcdbldext(a,desc_a,novr,desc_ov,info, extype) idx=workr(i) if (idx <1) then write(0,*) me,'Error in CDBLDEXTBLD level',i_ovr,' : ',idx,i,iszr - else If (desc_ov%glob_to_loc(idx) < -np) Then + else If (desc_ov%idxmap%glob_to_loc(idx) < -np) Then ! ! This is a new index. Assigning a local index as ! we receive them guarantees that all indices for HALO(I) ! will be less than those for HALO(J) whenever I= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& & ':Wrong input to cdbldextbld?',& - & idx,desc_ov%glob_to_loc(idx) + & idx,desc_ov%idxmap%glob_to_loc(idx) End If End Do desc_ov%matrix_data(psb_n_col_) = n_col diff --git a/base/tools/psb_dspins.f90 b/base/tools/psb_dspins.f90 index d7df6309..0f6e27a3 100644 --- a/base/tools/psb_dspins.f90 +++ b/base/tools/psb_dspins.f90 @@ -165,7 +165,7 @@ subroutine psb_dspins(nz,ia,ja,val,a,desc_a,info,rebuild) ncol = psb_cd_get_local_cols(desc_a) if (spstate == psb_spmat_bld_) then - call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%glob_to_loc) + call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%idxmap%glob_to_loc) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -211,7 +211,7 @@ subroutine psb_dspins(nz,ia,ja,val,a,desc_a,info,rebuild) nrow = psb_cd_get_local_rows(desc_a) ncol = psb_cd_get_local_cols(desc_a) call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& - & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) + & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -372,7 +372,7 @@ subroutine psb_dspins_2desc(nz,ia,ja,val,a,desc_ar,desc_ac,info) !!$ nrow = psb_cd_get_local_rows(desc_a) !!$ ncol = psb_cd_get_local_cols(desc_a) !!$ call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& -!!$ & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) +!!$ & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) !!$ if (info /= 0) then !!$ info=4010 !!$ ch_err='psb_coins' diff --git a/base/tools/psb_icdasb.F90 b/base/tools/psb_icdasb.F90 index 1ad13a1a..ecc6e4ff 100644 --- a/base/tools/psb_icdasb.F90 +++ b/base/tools/psb_icdasb.F90 @@ -118,7 +118,7 @@ subroutine psb_icdasb(desc_a,info,ext_hv) ! cdall(..., vl=vl, globalcheck=.false.) ! do i=1,psb_cd_get_local_cols(desc_a) - if (desc_a%loc_to_glob(i) < 0) then + if (desc_a%idxmap%loc_to_glob(i) < 0) then info=3100 exit endif @@ -129,7 +129,7 @@ subroutine psb_icdasb(desc_a,info,ext_hv) goto 9999 endif ! Trim size of loc_to_glob component. - call psb_realloc(psb_cd_get_local_cols(desc_a),desc_a%loc_to_glob,info) + call psb_realloc(psb_cd_get_local_cols(desc_a),desc_a%idxmap%loc_to_glob,info) ! If large index space, we have to pre-process and rebuild ! the list of halo indices as if it was in small index space diff --git a/base/tools/psb_loc_to_glob.f90 b/base/tools/psb_loc_to_glob.f90 index 7f551149..5846f691 100644 --- a/base/tools/psb_loc_to_glob.f90 +++ b/base/tools/psb_loc_to_glob.f90 @@ -78,33 +78,16 @@ subroutine psb_loc_to_glob2(x,y,desc_a,info,iact) endif act=psb_toupper(act) - n=size(x) - do i=1,n - if ((x(i) > psb_cd_get_local_cols(desc_a)).or.& - & (x(i) <= zero)) then - info=140 - int_err(1)=tmp - int_err(2)=psb_cd_get_local_cols(desc_a) - exit - else - tmp=desc_a%loc_to_glob(x(i)) - if((tmp > zero).or.(tmp <= psb_cd_get_global_rows(desc_a))) then - y(i)=tmp - else - info = 140 - int_err(1)=tmp - int_err(2)=psb_cd_get_local_cols(desc_a) - exit - end if - end if - enddo + call psb_map_l2g(x,y,desc_a%idxmap,info) if (info /= 0) then select case(act) case('E','I') - ! do nothing + ! do nothing, silently. + info = 0 case('W') - write(0,'("Error ",i5," in subroutine glob_to_loc")') info + write(0,'("Error ",i5," in subroutine loc_to_glob")') info + info = 0 case('A') call psb_errpush(info,name) goto 9999 @@ -204,32 +187,16 @@ subroutine psb_loc_to_glob(x,desc_a,info,iact) endif act = psb_toupper(act) - n=size(x) - do i=1,n - if ((x(i) > psb_cd_get_local_cols(desc_a)).or.& - & (x(i) <= zero)) then - info=140 - int_err(1)=x(i) - int_err(2)=psb_cd_get_local_cols(desc_a) - exit - else - tmp=desc_a%loc_to_glob(x(i)) - if((tmp > zero).or.(tmp <= psb_cd_get_global_rows(desc_a))) then - x(i)=tmp - else - info = 140 - exit - end if - end if - enddo + call psb_map_l2g(x,desc_a%idxmap,info) if (info /= 0) then select case(act) case('E','I') -!!$ call psb_erractionrestore(err_act) -!!$ return + ! do nothing, silently. + info = 0 case('W') - write(0,'("Error ",i5," in subroutine glob_to_loc")') info + write(0,'("Error ",i5," in subroutine loc_to_glob")') info + info = 0 case('A') call psb_errpush(info,name) goto 9999 diff --git a/base/tools/psb_scdbldext.F90 b/base/tools/psb_scdbldext.F90 index ee88b10b..4d897b9a 100644 --- a/base/tools/psb_scdbldext.F90 +++ b/base/tools/psb_scdbldext.F90 @@ -255,21 +255,18 @@ Subroutine psb_scdbldext(a,desc_a,novr,desc_ov,info, extype) Do j=0,n_elem_recv-1 idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 endif - - gidx = desc_ov%loc_to_glob(idx) - call psb_ensure_size((cntov_o+3),orig_ovr,info,pad=-1) if (info /= 0) then info=4010 call psb_errpush(info,name,a_err='psb_ensure_size') goto 9999 end if - orig_ovr(cntov_o)=proc orig_ovr(cntov_o+1)=1 orig_ovr(cntov_o+2)=gidx @@ -352,21 +349,19 @@ Subroutine psb_scdbldext(a,desc_a,novr,desc_ov,info, extype) ! add recv elements in halo_index into ovrlap_index ! Do j=0,n_elem_recv-1 - If((counter+psb_elem_recv_+j)>Size(halo)) then + If ((counter+psb_elem_recv_+j)>Size(halo)) then info=-2 call psb_errpush(info,name) goto 9999 end If idx = halo(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 - endif - - gidx = desc_ov%loc_to_glob(idx) - + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -404,12 +399,19 @@ Subroutine psb_scdbldext(a,desc_a,novr,desc_ov,info, extype) ! Do j=0,n_elem_send-1 - idx = halo(counter+psb_elem_send_+j) - gidx = desc_ov%loc_to_glob(idx) - if (idx > psb_cd_get_local_rows(Desc_a)) & - & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& - & idx,psb_cd_get_local_rows(Desc_a) +!!$ idx = halo(counter+psb_elem_send_+j) +!!$ gidx = desc_ov%loc_to_glob(idx) +!!$ if (idx > psb_cd_get_local_rows(Desc_a)) & +!!$ & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& +!!$ & idx,psb_cd_get_local_rows(Desc_a) + idx = halo(counter+psb_elem_send_+j) + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then + info=-3 + call psb_errpush(info,name) + goto 9999 + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -455,9 +457,9 @@ Subroutine psb_scdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - Do jj=1,n_elem - works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) - End Do + call psb_map_l2g(blk%ia2(1:n_elem),& + & works(idxs+tot_elem+1:idxs+tot_elem+n_elem),& + & desc_ov%idxmap,info) tot_elem=tot_elem+n_elem End If @@ -469,17 +471,12 @@ Subroutine psb_scdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_msort_unique(works(idxs+1:idxs+tot_elem),i) tot_elem=i endif - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) sdsz(proc+1) = tot_elem idxs = idxs + tot_elem end if counter = counter+n_elem_send+3 - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop End',tmp_ovr_idx(1:10) + Enddo if (debug_level >= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& @@ -600,23 +597,23 @@ Subroutine psb_scdbldext(a,desc_a,novr,desc_ov,info, extype) idx=workr(i) if (idx <1) then write(0,*) me,'Error in CDBLDEXTBLD level',i_ovr,' : ',idx,i,iszr - else If (desc_ov%glob_to_loc(idx) < -np) Then + else If (desc_ov%idxmap%glob_to_loc(idx) < -np) Then ! ! This is a new index. Assigning a local index as ! we receive them guarantees that all indices for HALO(I) ! will be less than those for HALO(J) whenever I= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& & ':Wrong input to cdbldextbld?',& - & idx,desc_ov%glob_to_loc(idx) + & idx,desc_ov%idxmap%glob_to_loc(idx) End If End Do desc_ov%matrix_data(psb_n_col_) = n_col diff --git a/base/tools/psb_sspins.f90 b/base/tools/psb_sspins.f90 index d1359e31..0fbff3b1 100644 --- a/base/tools/psb_sspins.f90 +++ b/base/tools/psb_sspins.f90 @@ -165,7 +165,7 @@ subroutine psb_sspins(nz,ia,ja,val,a,desc_a,info,rebuild) ncol = psb_cd_get_local_cols(desc_a) if (spstate == psb_spmat_bld_) then - call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%glob_to_loc) + call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%idxmap%glob_to_loc) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -211,7 +211,7 @@ subroutine psb_sspins(nz,ia,ja,val,a,desc_a,info,rebuild) nrow = psb_cd_get_local_rows(desc_a) ncol = psb_cd_get_local_cols(desc_a) call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& - & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) + & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -372,7 +372,7 @@ subroutine psb_sspins_2desc(nz,ia,ja,val,a,desc_ar,desc_ac,info) !!$ nrow = psb_cd_get_local_rows(desc_a) !!$ ncol = psb_cd_get_local_cols(desc_a) !!$ call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& -!!$ & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) +!!$ & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) !!$ if (info /= 0) then !!$ info=4010 !!$ ch_err='psb_coins' diff --git a/base/tools/psb_zcdbldext.F90 b/base/tools/psb_zcdbldext.F90 index 9b2e13e6..8c3cc07b 100644 --- a/base/tools/psb_zcdbldext.F90 +++ b/base/tools/psb_zcdbldext.F90 @@ -255,21 +255,18 @@ Subroutine psb_zcdbldext(a,desc_a,novr,desc_ov,info, extype) Do j=0,n_elem_recv-1 idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 endif - - gidx = desc_ov%loc_to_glob(idx) - call psb_ensure_size((cntov_o+3),orig_ovr,info,pad=-1) if (info /= 0) then info=4010 call psb_errpush(info,name,a_err='psb_ensure_size') goto 9999 end if - orig_ovr(cntov_o)=proc orig_ovr(cntov_o+1)=1 orig_ovr(cntov_o+2)=gidx @@ -352,21 +349,19 @@ Subroutine psb_zcdbldext(a,desc_a,novr,desc_ov,info, extype) ! add recv elements in halo_index into ovrlap_index ! Do j=0,n_elem_recv-1 - If((counter+psb_elem_recv_+j)>Size(halo)) then + If ((counter+psb_elem_recv_+j)>Size(halo)) then info=-2 call psb_errpush(info,name) goto 9999 end If idx = halo(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then info=-3 call psb_errpush(info,name) goto 9999 - endif - - gidx = desc_ov%loc_to_glob(idx) - + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -404,12 +399,19 @@ Subroutine psb_zcdbldext(a,desc_a,novr,desc_ov,info, extype) ! Do j=0,n_elem_send-1 - idx = halo(counter+psb_elem_send_+j) - gidx = desc_ov%loc_to_glob(idx) - if (idx > psb_cd_get_local_rows(Desc_a)) & - & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& - & idx,psb_cd_get_local_rows(Desc_a) +!!$ idx = halo(counter+psb_elem_send_+j) +!!$ gidx = desc_ov%loc_to_glob(idx) +!!$ if (idx > psb_cd_get_local_rows(Desc_a)) & +!!$ & write(debug_unit,*) me,' ',trim(name),':Out of local rows ',i_ovr,& +!!$ & idx,psb_cd_get_local_rows(Desc_a) + idx = halo(counter+psb_elem_send_+j) + call psb_map_l2g(idx,gidx,desc_ov%idxmap,info) + If (gidx < 0) then + info=-3 + call psb_errpush(info,name) + goto 9999 + endif call psb_ensure_size((counter_o+3),tmp_ovr_idx,info,pad=-1) if (info /= 0) then info=4010 @@ -455,9 +457,9 @@ Subroutine psb_zcdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - Do jj=1,n_elem - works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) - End Do + call psb_map_l2g(blk%ia2(1:n_elem),& + & works(idxs+tot_elem+1:idxs+tot_elem+n_elem),& + & desc_ov%idxmap,info) tot_elem=tot_elem+n_elem End If @@ -469,17 +471,12 @@ Subroutine psb_zcdbldext(a,desc_a,novr,desc_ov,info, extype) call psb_msort_unique(works(idxs+1:idxs+tot_elem),i) tot_elem=i endif - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) sdsz(proc+1) = tot_elem idxs = idxs + tot_elem end if counter = counter+n_elem_send+3 - if (debug_level >= psb_debug_outer_) & - & write(debug_unit,*) me,' ',trim(name),& - & ':Checktmp_o_i Loop End',tmp_ovr_idx(1:10) + Enddo if (debug_level >= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& @@ -600,23 +597,23 @@ Subroutine psb_zcdbldext(a,desc_a,novr,desc_ov,info, extype) idx=workr(i) if (idx <1) then write(0,*) me,'Error in CDBLDEXTBLD level',i_ovr,' : ',idx,i,iszr - else If (desc_ov%glob_to_loc(idx) < -np) Then + else If (desc_ov%idxmap%glob_to_loc(idx) < -np) Then ! ! This is a new index. Assigning a local index as ! we receive them guarantees that all indices for HALO(I) ! will be less than those for HALO(J) whenever I= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& & ':Wrong input to cdbldextbld?',& - & idx,desc_ov%glob_to_loc(idx) + & idx,desc_ov%idxmap%glob_to_loc(idx) End If End Do desc_ov%matrix_data(psb_n_col_) = n_col diff --git a/base/tools/psb_zspins.f90 b/base/tools/psb_zspins.f90 index bd74ac33..00994cf7 100644 --- a/base/tools/psb_zspins.f90 +++ b/base/tools/psb_zspins.f90 @@ -167,7 +167,7 @@ subroutine psb_zspins(nz,ia,ja,val,a,desc_a,info,rebuild) ncol = psb_cd_get_local_cols(desc_a) if (spstate == psb_spmat_bld_) then - call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%glob_to_loc) + call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,info,gtl=desc_a%idxmap%glob_to_loc) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -213,7 +213,7 @@ subroutine psb_zspins(nz,ia,ja,val,a,desc_a,info,rebuild) nrow = psb_cd_get_local_rows(desc_a) ncol = psb_cd_get_local_cols(desc_a) call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& - & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) + & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) if (info /= 0) then info=4010 ch_err='psb_coins' @@ -374,7 +374,7 @@ subroutine psb_zspins_2desc(nz,ia,ja,val,a,desc_ar,desc_ac,info) !!$ nrow = psb_cd_get_local_rows(desc_a) !!$ ncol = psb_cd_get_local_cols(desc_a) !!$ call psb_coins(nz,ia,ja,val,a,1,nrow,1,ncol,& -!!$ & info,gtl=desc_a%glob_to_loc,rebuild=rebuild_) +!!$ & info,gtl=desc_a%idxmap%glob_to_loc,rebuild=rebuild_) !!$ if (info /= 0) then !!$ info=4010 !!$ ch_err='psb_coins'