From f03078bfdb651a0d7e9833f3c03990bda75a2112 Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Wed, 6 Dec 2006 14:06:06 +0000 Subject: [PATCH] Fixed allocation for large index spaces. --- src/tools/psb_cdall.f90 | 279 +++++++++++++++++++++++++++------------- src/tools/psb_cddec.f90 | 140 +++++++++++++------- 2 files changed, 281 insertions(+), 138 deletions(-) diff --git a/src/tools/psb_cdall.f90 b/src/tools/psb_cdall.f90 index deeb6d8e..a7ca87ef 100644 --- a/src/tools/psb_cdall.f90 +++ b/src/tools/psb_cdall.f90 @@ -57,7 +57,7 @@ subroutine psb_cdall(m, n, parts, ictxt, desc_a, info) !locals Integer :: counter,i,j,np,me,loc_row,err,loc_col,nprocs,& - & l_ov_ix,l_ov_el,idx, err_act, itmpov, k, ns + & l_ov_ix,l_ov_el,idx, err_act, itmpov, k, ns, glx integer :: int_err(5),exch(2) integer, allocatable :: prc_v(:), temp_ovrlap(:), ov_idx(:),ov_el(:) logical, parameter :: debug=.false. @@ -116,13 +116,17 @@ subroutine psb_cdall(m, n, parts, ictxt, desc_a, info) !count local rows number ! allocate work vector - allocate(prc_v(np),desc_a%glob_to_loc(m),& - &desc_a%matrix_data(psb_mdata_size_),temp_ovrlap(m),stat=info) - if (info /= no_err) then + if (m > psb_cd_get_large_threshold()) then + allocate(desc_a%matrix_data(psb_mdata_size_),& + & temp_ovrlap(m),prc_v(np),stat=info) + else + allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& + & temp_ovrlap(m),prc_v(np),stat=info) + end if + if (info /= 0) then info=2025 - err=info int_err(1)=m - call psb_errpush(err,name,int_err) + call psb_errpush(info,name,i_err=int_err) goto 9999 endif @@ -131,76 +135,188 @@ subroutine psb_cdall(m, n, parts, ictxt, desc_a, info) counter = 0 itmpov = 0 temp_ovrlap(:) = -1 - do i=1,m - if (info == 0) then - call parts(i,m,np,prc_v,nprocs) - if (nprocs > np) then - info=570 - int_err(1)=3 - int_err(2)=np - int_err(3)=nprocs - int_err(4)=i - err=info - call psb_errpush(err,name,int_err) - goto 9999 - else if (nprocs <= 0) then - info=575 - int_err(1)=3 - int_err(2)=nprocs - int_err(3)=i - err=info - call psb_errpush(err,name,int_err) - goto 9999 - else - do j=1,nprocs - if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then - info=580 - int_err(1)=3 - int_err(2)=prc_v(j) - int_err(3)=i - err=info - call psb_errpush(err,name,int_err) - goto 9999 - end if - end do - endif - desc_a%glob_to_loc(i) = -(np+prc_v(1)+1) - j=1 - do - if (j > nprocs) exit - if (prc_v(j) == me) exit - j=j+1 - enddo - if (j <= nprocs) then - if (prc_v(j) == me) then - ! this point belongs to me - counter=counter+1 - desc_a%glob_to_loc(i) = counter - if (nprocs > 1) then - if ((itmpov+2+nprocs) > size(temp_ovrlap)) then - ns = max(itmpov+2+nprocs,int(1.25*size(temp_ovrlap))) - call psb_realloc(ns,temp_ovrlap,info,pad=-1) - if (info /= 0) then - info=2025 - int_err(1)=m - err=info - call psb_errpush(err,name,int_err) + if ( m >psb_cd_get_large_threshold()) then + desc_a%matrix_data(psb_dec_type_) = psb_desc_large_bld_ + loc_col = (m+np-1)/np + allocate(desc_a%loc_to_glob(loc_col), desc_a%lprm(1),& + & desc_a%ptree(2),stat=info) + if (info == 0) call InitPairSearchTree(desc_a%ptree,info) + if (info /= 0) then + info=2025 + int_err(1)=loc_col + call psb_errpush(info,name,i_err=int_err) + goto 9999 + end if + + ! set LOC_TO_GLOB array to all "-1" values + desc_a%lprm(1) = 0 + desc_a%loc_to_glob(:) = -1 + k = 0 + do i=1,m + if (info == 0) then + call parts(i,m,np,prc_v,nprocs) + if (nprocs > np) then + info=570 + int_err(1)=3 + int_err(2)=np + int_err(3)=nprocs + int_err(4)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else if (nprocs <= 0) then + info=575 + int_err(1)=3 + int_err(2)=nprocs + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else + do j=1,nprocs + if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then + info=580 + int_err(1)=3 + int_err(2)=prc_v(j) + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + end if + end do + endif + j=1 + do + if (j > nprocs) exit + if (prc_v(j) == me) exit + j=j+1 + enddo + + if (j <= nprocs) then + if (prc_v(j) == me) then + ! this point belongs to me + k = k + 1 + call psb_check_size((k+1),desc_a%loc_to_glob,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + desc_a%loc_to_glob(k) = i + call SearchInsKeyVal(desc_a%ptree,i,k,glx,info) + if (nprocs > 1) then + call psb_check_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') goto 9999 - endif + end if + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = i + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = nprocs + temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs) + itmpov = itmpov + nprocs endif - itmpov = itmpov + 1 - temp_ovrlap(itmpov) = i - itmpov = itmpov + 1 - temp_ovrlap(itmpov) = nprocs - temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs) - itmpov = itmpov + nprocs - endif - end if + end if + end if end if + enddo + if (info /= 0) then + info=4000 + call psb_errpush(info,name) + goto 9999 endif - enddo + loc_row = k + else + + desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_ + do i=1,m + if (info == 0) then + call parts(i,m,np,prc_v,nprocs) + if (nprocs > np) then + info=570 + int_err(1)=3 + int_err(2)=np + int_err(3)=nprocs + int_err(4)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else if (nprocs <= 0) then + info=575 + int_err(1)=3 + int_err(2)=nprocs + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else + do j=1,nprocs + if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then + info=580 + int_err(1)=3 + int_err(2)=prc_v(j) + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + end if + end do + endif + desc_a%glob_to_loc(i) = -(np+prc_v(1)+1) + j=1 + do + if (j > nprocs) exit + if (prc_v(j) == me) exit + j=j+1 + enddo + if (j <= nprocs) then + if (prc_v(j) == me) then + ! this point belongs to me + counter=counter+1 + desc_a%glob_to_loc(i) = counter + if (nprocs > 1) then + call psb_check_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = i + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = nprocs + temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs) + itmpov = itmpov + nprocs + endif + end if + end if + endif + enddo + ! estimate local cols number + loc_row=counter + loc_col=min(2*loc_row,m) + + allocate(desc_a%loc_to_glob(loc_col),& + &desc_a%lprm(1),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if + + ! set LOC_TO_GLOB array to all "-1" values + desc_a%lprm(1) = 0 + desc_a%loc_to_glob(:) = -1 + do i=1,m + k = desc_a%glob_to_loc(i) + if (k > 0) then + desc_a%loc_to_glob(k) = i + endif + enddo + + end if - loc_row=counter ! check on parts function if (debug) write(*,*) 'PSB_CDALL: End main loop:' ,loc_row,itmpov,info @@ -267,25 +383,6 @@ subroutine psb_cdall(m, n, parts, ictxt, desc_a, info) call psb_errpush(err,name) Goto 9999 endif - ! estimate local cols number - loc_col=min(2*loc_row,m) - - allocate(desc_a%loc_to_glob(loc_col),& - &desc_a%lprm(1),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if - - ! set LOC_TO_GLOB array to all "-1" values - desc_a%lprm(1) = 0 - desc_a%loc_to_glob(:) = -1 - do i=1,m - k = desc_a%glob_to_loc(i) - if (k > 0) then - desc_a%loc_to_glob(k) = i - endif - enddo ! set fields in desc_a%MATRIX_DATA.... desc_a%matrix_data(psb_n_row_) = loc_row @@ -301,10 +398,8 @@ subroutine psb_cdall(m, n, parts, ictxt, desc_a, info) desc_a%halo_index(:) = -1 - desc_a%matrix_data(psb_m_) = m desc_a%matrix_data(psb_n_) = n - desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_ desc_a%matrix_data(psb_ctxt_) = ictxt call psb_get_mpicomm(ictxt,desc_a%matrix_data(psb_mpi_c_)) diff --git a/src/tools/psb_cddec.f90 b/src/tools/psb_cddec.f90 index ddfcd950..9fea6a97 100644 --- a/src/tools/psb_cddec.f90 +++ b/src/tools/psb_cddec.f90 @@ -117,7 +117,7 @@ subroutine psb_cddec(nloc, ictxt, desc_a, info) !locals Integer :: i,j,np,me,err,n,itmpov, k,& - & l_ov_ix,l_ov_el,idx, err_act,m, ip + & l_ov_ix,l_ov_el,idx, err_act,m, ip,glx Integer :: INT_ERR(5), thalo(1), tovr(1) integer, allocatable :: nlv(:) logical, parameter :: debug=.false. @@ -164,57 +164,105 @@ subroutine psb_cddec(nloc, ictxt, desc_a, info) !count local rows number - ! allocate work vector -!!$ allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& -!!$ & desc_a%loc_to_glob(nloc),desc_a%lprm(1),& -!!$ & desc_a%ovrlap_index(1),desc_a%ovrlap_elem(1),& -!!$ & desc_a%halo_index(1),desc_a%bnd_elem(1),stat=info) - allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& - & desc_a%loc_to_glob(m),desc_a%lprm(1),stat=info) - if (info /= 0) then - info=2025 - int_err(1)=m - call psb_errpush(info,name,i_err=int_err) - goto 9999 - endif + if ( m >psb_cd_get_large_threshold()) then + allocate(desc_a%loc_to_glob(nloc), desc_a%lprm(1),& + & desc_a%ptree(2),desc_a%matrix_data(psb_mdata_size_),stat=info) + if (info == 0) call InitPairSearchTree(desc_a%ptree,info) + if (info /= 0) then + info=2025 + int_err(1)=nloc + call psb_errpush(info,name,i_err=int_err) + goto 9999 + end if + + ! set LOC_TO_GLOB array to all "-1" values + desc_a%lprm(1) = 0 + desc_a%loc_to_glob(:) = -1 + desc_a%matrix_data(psb_n_row_) = nloc + desc_a%matrix_data(psb_n_col_) = nloc + desc_a%matrix_data(psb_m_) = m + desc_a%matrix_data(psb_n_) = m + desc_a%matrix_data(psb_dec_type_) = psb_desc_large_bld_ + desc_a%matrix_data(psb_ctxt_) = ictxt + call psb_get_mpicomm(ictxt,desc_a%matrix_data(psb_mpi_c_)) + + do ip=0, np-1 + if (ip==me) then + do i=1, nlv(ip) + call SearchInsKeyVal(desc_a%ptree,j,i,glx,info) + desc_a%loc_to_glob(i) = j + j = j + 1 + enddo + else + do i=1, nlv(ip) + j = j + 1 + enddo + endif + enddo + + tovr = -1 + thalo = -1 + + desc_a%lprm(:) = 0 + + call psi_cnv_dsc(thalo,tovr,desc_a,info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='psi_bld_cdesc') + goto 9999 + end if + + desc_a%matrix_data(psb_dec_type_) = psb_desc_large_asb_ + else - desc_a%matrix_data(psb_n_row_) = nloc - desc_a%matrix_data(psb_n_col_) = nloc - desc_a%matrix_data(psb_m_) = m - desc_a%matrix_data(psb_n_) = m - desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_ - desc_a%matrix_data(psb_ctxt_) = ictxt - call psb_get_mpicomm(ictxt,desc_a%matrix_data(psb_mpi_c_)) - - j = 1 - do ip=0, np-1 - if (ip==me) then - do i=1, nlv(ip) - desc_a%glob_to_loc(j) = i - desc_a%loc_to_glob(i) = j - j = j + 1 - enddo - else - do i=1, nlv(ip) - desc_a%glob_to_loc(j) = -(np+ip+1) - j = j + 1 - enddo + allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& + & desc_a%loc_to_glob(m),desc_a%lprm(1),stat=info) + if (info /= 0) then + info=2025 + int_err(1)=m + call psb_errpush(info,name,i_err=int_err) + goto 9999 endif - enddo - tovr = -1 - thalo = -1 - - desc_a%lprm(:) = 0 - call psi_cnv_dsc(thalo,tovr,desc_a,info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='psi_bld_cdesc') - goto 9999 - end if + desc_a%matrix_data(psb_n_row_) = nloc + desc_a%matrix_data(psb_n_col_) = nloc + desc_a%matrix_data(psb_m_) = m + desc_a%matrix_data(psb_n_) = m + desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_ + desc_a%matrix_data(psb_ctxt_) = ictxt + call psb_get_mpicomm(ictxt,desc_a%matrix_data(psb_mpi_c_)) + + j = 1 + do ip=0, np-1 + if (ip==me) then + do i=1, nlv(ip) + desc_a%glob_to_loc(j) = i + desc_a%loc_to_glob(i) = j + j = j + 1 + enddo + else + do i=1, nlv(ip) + desc_a%glob_to_loc(j) = -(np+ip+1) + j = j + 1 + enddo + endif + enddo + + tovr = -1 + thalo = -1 - desc_a%matrix_data(psb_dec_type_) = psb_desc_asb_ + desc_a%lprm(:) = 0 + + call psi_cnv_dsc(thalo,tovr,desc_a,info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='psi_bld_cdesc') + goto 9999 + end if + + desc_a%matrix_data(psb_dec_type_) = psb_desc_asb_ + + endif call psb_erractionrestore(err_act) return