diff --git a/base/modules/psb_const_mod.f90 b/base/modules/psb_const_mod.f90 index 7ede5c7f..63cecdc7 100644 --- a/base/modules/psb_const_mod.f90 +++ b/base/modules/psb_const_mod.f90 @@ -58,21 +58,24 @@ module psb_const_mod ! integer, parameter :: psb_dec_type_=1, psb_m_=2,psb_n_=3 integer, parameter :: psb_n_row_=4, psb_n_col_=5,psb_ctxt_=6 - integer, parameter :: psb_loc_to_glob_=7 + integer, parameter :: psb_desc_size_=7 + integer, parameter :: psb_ovl_state_=8 + integer, parameter :: psb_mpi_c_=9 integer, parameter :: psb_thal_xch_=11 integer, parameter :: psb_thal_snd_=12 integer, parameter :: psb_thal_rcv_=13 integer, parameter :: psb_tovr_xch_=14 integer, parameter :: psb_tovr_snd_=15 integer, parameter :: psb_tovr_rcv_=16 - integer, parameter :: psb_mpi_c_=9,psb_mdata_size_=20 + integer, parameter :: psb_mdata_size_=20 integer, parameter :: psb_desc_asb_=3099 integer, parameter :: psb_desc_bld_=psb_desc_asb_+1 integer, parameter :: psb_desc_repl_=3199 integer, parameter :: psb_desc_upd_=psb_desc_bld_+1 - integer, parameter :: psb_desc_upd_asb_=psb_desc_upd_+1 - integer, parameter :: psb_desc_large_asb_=psb_desc_upd_asb_+1 - integer, parameter :: psb_desc_large_bld_=psb_desc_large_asb_+1 + integer, parameter :: psb_desc_normal_=3299 + integer, parameter :: psb_desc_large_=psb_desc_normal_+1 + integer, parameter :: psb_cd_ovl_bld_=3399 + integer, parameter :: psb_cd_ovl_asb_=psb_cd_ovl_bld_+1 integer, parameter :: nbits=14 integer, parameter :: hashsize=2**nbits, hashmask=hashsize-1 integer, parameter :: psb_default_large_threshold=4*1024*1024 ! to be reviewed diff --git a/base/modules/psb_desc_type.f90 b/base/modules/psb_desc_type.f90 index 8d8e0ba0..b0e13796 100644 --- a/base/modules/psb_desc_type.f90 +++ b/base/modules/psb_desc_type.f90 @@ -111,7 +111,7 @@ contains logical function psb_is_large_desc(desc) type(psb_desc_type), intent(in) :: desc - psb_is_large_desc = psb_is_large_dec(psb_cd_get_dectype(desc)) + psb_is_large_desc =(psb_desc_large_==psb_cd_get_size(desc)) end function psb_is_large_desc @@ -122,12 +122,6 @@ contains end function psb_is_upd_desc - logical function psb_is_asb_upd_desc(desc) - type(psb_desc_type), intent(in) :: desc - - psb_is_asb_upd_desc = psb_is_asb_upd_dec(psb_cd_get_dectype(desc)) - - end function psb_is_asb_upd_desc logical function psb_is_asb_desc(desc) type(psb_desc_type), intent(in) :: desc @@ -141,16 +135,14 @@ contains integer :: dectype psb_is_ok_dec = ((dectype == psb_desc_asb_).or.(dectype == psb_desc_bld_).or.& - &(dectype == psb_desc_upd_).or.(dectype== psb_desc_upd_asb_).or.& - &(dectype == psb_desc_large_asb_).or.(dectype == psb_desc_large_bld_).or.& + &(dectype == psb_desc_upd_).or.& &(dectype== psb_desc_repl_)) end function psb_is_ok_dec logical function psb_is_bld_dec(dectype) integer :: dectype - psb_is_bld_dec = (dectype == psb_desc_bld_)& - & .or.(dectype == psb_desc_large_bld_) + psb_is_bld_dec = (dectype == psb_desc_bld_) end function psb_is_bld_dec logical function psb_is_upd_dec(dectype) @@ -160,18 +152,11 @@ contains end function psb_is_upd_dec - logical function psb_is_asb_upd_dec(dectype) - integer :: dectype - - psb_is_asb_upd_dec = (dectype == psb_desc_upd_asb_) - end function psb_is_asb_upd_dec - - logical function psb_is_asb_dec(dectype) + logical function psb_is_asb_dec(dectype) integer :: dectype - psb_is_asb_dec = (dectype == psb_desc_asb_)& - & .or.(dectype == psb_desc_large_asb_).or.& + psb_is_asb_dec = (dectype == psb_desc_asb_).or.& & (dectype== psb_desc_repl_) end function psb_is_asb_dec @@ -213,19 +198,79 @@ contains psb_cd_get_dectype = desc%matrix_data(psb_dec_type_) end function psb_cd_get_dectype + integer function psb_cd_get_size(desc) + type(psb_desc_type), intent(in) :: desc + + psb_cd_get_size = desc%matrix_data(psb_desc_size_) + end function psb_cd_get_size + integer function psb_cd_get_mpic(desc) type(psb_desc_type), intent(in) :: desc psb_cd_get_mpic = desc%matrix_data(psb_mpi_c_) end function psb_cd_get_mpic - - logical function psb_is_large_dec(dectype) - integer :: dectype - psb_is_large_dec = (dectype == psb_desc_large_asb_)& - & .or.(dectype == psb_desc_large_bld_) - end function psb_is_large_dec + subroutine psb_cd_set_bld(desc,info) + ! + ! Change state of a descriptor into BUILD. + ! If the descriptor is LARGE, check the AVL search tree + ! and initialize it if necessary. + ! + use psb_const_mod + use psb_error_mod + use psb_penv_mod + + implicit none + type(psb_desc_type), intent(inout) :: desc + integer :: info + !locals + integer :: np,me,ictxt, isz, err_act,idx,gidx,lidx + logical, parameter :: debug=.false.,debugprt=.false. + character(len=20) :: name, char_err + if (debug) write(0,*) me,'Entered CDCPY' + if (psb_get_errstatus() /= 0) return + info = 0 + call psb_erractionsave(err_act) + name = 'psb_cd_set_bld' + + ictxt = psb_cd_get_context(desc) + + ! check on blacs grid + call psb_info(ictxt, me, np) + if (debug) write(0,*) me,'Entered CDCPY' + + if (psb_is_large_desc(desc)) then + if (.not.allocated(desc%ptree)) then + allocate(desc%ptree(2),stat=info) + if (info /= 0) then + info=4000 + goto 9999 + endif + call InitPairSearchTree(desc%ptree,info) + do idx=1, psb_cd_get_local_cols(desc) + gidx = desc%loc_to_glob(idx) + call SearchInsKeyVal(desc%ptree,gidx,idx,lidx,info) + if (lidx /= idx) then + write(0,*) 'Warning from cdset: mismatch in PTREE ',idx,lidx + endif + enddo + end if + end if + desc%matrix_data(psb_dec_type_) = psb_desc_bld_ + + call psb_erractionrestore(err_act) + return +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == act_ret) then + return + else + call psb_error(ictxt) + end if + return + end subroutine psb_cd_set_bld end module psb_descriptor_type diff --git a/base/modules/psb_tools_mod.f90 b/base/modules/psb_tools_mod.f90 index 0e99ab01..ce136fd2 100644 --- a/base/modules/psb_tools_mod.f90 +++ b/base/modules/psb_tools_mod.f90 @@ -706,7 +706,7 @@ contains call psb_sum(ictxt,itmpsz) nlp=0 do i=0, me-1 - nlp = nlp + itmpsz(me) + nlp = nlp + itmpsz(i) end do call psb_cd_inloc((/(i,i=nlp+1,nlp+nl)/),ictxt,desc_a,info) diff --git a/base/tools/psb_cd_inloc.f90 b/base/tools/psb_cd_inloc.f90 index 4bdf979a..ffc13766 100644 --- a/base/tools/psb_cd_inloc.f90 +++ b/base/tools/psb_cd_inloc.f90 @@ -56,7 +56,7 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) Integer :: counter,i,j,np,me,loc_row,err,& & loc_col,nprocs,n,itmpov, k,glx,gidx,gle,& & l_ov_ix,l_ov_el,idx, flag_, err_act,m - integer :: int_err(5),exch(2) + integer :: int_err(5),exch(3) Integer, allocatable :: temp_ovrlap(:), ov_idx(:),ov_el(:),tmpgidx(:,:) logical, parameter :: debug=.false. character(len=20) :: name @@ -64,16 +64,16 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) if(psb_get_errstatus() /= 0) return info=0 err=0 - name = 'psb_cdalv' + name = 'psb_cd_inloc' call psb_info(ictxt, me, np) if (debug) write(*,*) 'psb_cdall: ',np,me - loc_row=size(v) - - m = loc_row + loc_row = size(v) + m = loc_row call psb_sum(ictxt,m) + n = m !... check m and n parameters.... if (m < 1) then @@ -90,6 +90,26 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) call psb_errpush(info,name,i_err=int_err) goto 9999 end if + if (me == psb_root_) then + exch(1)=m + exch(2)=n + exch(3)=psb_cd_get_large_threshold() + call psb_bcast(ictxt,exch(1:3),root=psb_root_) + else + call psb_bcast(ictxt,exch(1:3),root=psb_root_) + if (exch(1) /= m) then + err=550 + int_err(1)=1 + call psb_errpush(err,name,int_err) + goto 9999 + else if (exch(2) /= n) then + err=550 + int_err(1)=2 + call psb_errpush(err,name,int_err) + goto 9999 + endif + call psb_cd_set_large_threshold(exch(3)) + endif if (debug) write(*,*) 'psb_cdall: doing global checks' allocate(tmpgidx(m,2),stat=info) @@ -98,18 +118,30 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) call psb_errpush(info,name,i_err=int_err) goto 9999 end if + + + tmpgidx=0 + flag_=1 do i=1,loc_row if ((v(i)<1).or.(v(i)>m)) then info = 551 + int_err(1) = i + int_err(2) = v(i) + int_err(3) = loc_row + int_err(4) = m else - tmpgidx(v(i),1) = me + tmpgidx(v(i),1) = me+flag_ tmpgidx(v(i),2) = 1 endif end do call psb_amx(ictxt,tmpgidx) - if (count(tmpgidx(:,2) == 0)>0) then - info = 552 + if (info ==0) then + int_err(1) = count(tmpgidx(:,2) == 0) + int_err(2) = m + if (int_err(1)>0) then + info = 552 + end if end if if (info /= 0) then @@ -120,16 +152,17 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) call psb_nullify_desc(desc_a) - flag_=0 !count local rows number ! allocate work vector if (m > psb_cd_get_large_threshold()) then allocate(desc_a%matrix_data(psb_mdata_size_),& &temp_ovrlap(m),stat=info) + desc_a%matrix_data(psb_desc_size_) = psb_desc_large_ else allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& &temp_ovrlap(m),stat=info) + desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_ end if if (info /= 0) then info=2025 @@ -145,7 +178,6 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) temp_ovrlap(:) = -1 if (m > psb_cd_get_large_threshold()) then - desc_a%matrix_data(psb_dec_type_) = psb_desc_large_bld_ do i=1,m @@ -210,7 +242,6 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) else - desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_ do i=1,m if (((tmpgidx(i,1)-flag_) > np-1).or.((tmpgidx(i,1)-flag_) < 0)) then @@ -327,13 +358,14 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info) ! set fields in desc_a%MATRIX_DATA.... desc_a%matrix_data(psb_n_row_) = loc_row desc_a%matrix_data(psb_n_col_) = loc_row + call psb_cd_set_bld(desc_a,info) - allocate(desc_a%halo_index(1),stat=info) - if (info /= 0) then - info=4000 - call psb_errpush(info,name) - goto 9999 - endif + call psb_realloc(1,desc_a%halo_index, info) + if (info /= no_err) then + info=2025 + call psb_errpush(err,name,a_err='psb_realloc') + Goto 9999 + end if desc_a%halo_index(:) = -1 diff --git a/base/tools/psb_cdals.f90 b/base/tools/psb_cdals.f90 index 3018a6cd..dc6a3e81 100644 --- a/base/tools/psb_cdals.f90 +++ b/base/tools/psb_cdals.f90 @@ -57,11 +57,11 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info) !locals Integer :: counter,i,j,np,me,loc_row,err,loc_col,nprocs,& - & l_ov_ix,l_ov_el,idx, err_act, itmpov, k, ns - integer :: int_err(5),exch(2) + & l_ov_ix,l_ov_el,idx, err_act, itmpov, k, ns, glx, mth + integer :: int_err(5),exch(3) integer, allocatable :: prc_v(:), temp_ovrlap(:), ov_idx(:),ov_el(:) logical, parameter :: debug=.false. - character(len=20) :: name, char_err + character(len=20) :: name if(psb_get_errstatus() /= 0) return info=0 @@ -96,9 +96,10 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info) if (me == psb_root_) then exch(1)=m exch(2)=n - call psb_bcast(ictxt,exch(1:2),root=psb_root_) + exch(3)=psb_cd_get_large_threshold() + call psb_bcast(ictxt,exch(1:3),root=psb_root_) else - call psb_bcast(ictxt,exch(1:2),root=psb_root_) + call psb_bcast(ictxt,exch(1:3),root=psb_root_) if (exch(1) /= m) then err=550 int_err(1)=1 @@ -110,15 +111,23 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info) call psb_errpush(err,name,int_err) goto 9999 endif + call psb_cd_set_large_threshold(exch(3)) endif call psb_nullify_desc(desc_a) !count local rows number ! allocate work vector - allocate(prc_v(np),desc_a%glob_to_loc(m),& - &desc_a%matrix_data(psb_mdata_size_),temp_ovrlap(m),stat=info) - if (info /= no_err) then + if (m > psb_cd_get_large_threshold()) then + allocate(desc_a%matrix_data(psb_mdata_size_),& + & temp_ovrlap(m),prc_v(np),stat=info) + desc_a%matrix_data(psb_desc_size_) = psb_desc_large_ + else + allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& + & temp_ovrlap(m),prc_v(np),stat=info) + desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_ + end if + if (info /= 0) then info=2025 err=info int_err(1)=m @@ -131,76 +140,187 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info) counter = 0 itmpov = 0 temp_ovrlap(:) = -1 - do i=1,m - if (info == 0) then - call parts(i,m,np,prc_v,nprocs) - if (nprocs > np) then - info=570 - int_err(1)=3 - int_err(2)=np - int_err(3)=nprocs - int_err(4)=i - err=info - call psb_errpush(err,name,int_err) - goto 9999 - else if (nprocs <= 0) then - info=575 - int_err(1)=3 - int_err(2)=nprocs - int_err(3)=i - err=info - call psb_errpush(err,name,int_err) - goto 9999 - else - do j=1,nprocs - if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then - info=580 - int_err(1)=3 - int_err(2)=prc_v(j) - int_err(3)=i - err=info - call psb_errpush(err,name,int_err) - goto 9999 - end if - end do - endif - desc_a%glob_to_loc(i) = -(np+prc_v(1)+1) - j=1 - do - if (j > nprocs) exit - if (prc_v(j) == me) exit - j=j+1 - enddo - if (j <= nprocs) then - if (prc_v(j) == me) then - ! this point belongs to me - counter=counter+1 - desc_a%glob_to_loc(i) = counter - if (nprocs > 1) then - if ((itmpov+2+nprocs) > size(temp_ovrlap)) then - ns = max(itmpov+2+nprocs,int(1.25*size(temp_ovrlap))) - call psb_realloc(ns,temp_ovrlap,info,pad=-1) - if (info /= 0) then - info=2025 - int_err(1)=m - err=info - call psb_errpush(err,name,int_err) + if ( m >psb_cd_get_large_threshold()) then + loc_col = (m+np-1)/np + allocate(desc_a%loc_to_glob(loc_col), desc_a%lprm(1),& + & desc_a%ptree(2),stat=info) + if (info == 0) call InitPairSearchTree(desc_a%ptree,info) + if (info /= 0) then + info=2025 + int_err(1)=loc_col + call psb_errpush(info,name,i_err=int_err) + goto 9999 + end if + + ! set LOC_TO_GLOB array to all "-1" values + desc_a%lprm(1) = 0 + desc_a%loc_to_glob(:) = -1 + k = 0 + do i=1,m + if (info == 0) then + call parts(i,m,np,prc_v,nprocs) + if (nprocs > np) then + info=570 + int_err(1)=3 + int_err(2)=np + int_err(3)=nprocs + int_err(4)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else if (nprocs <= 0) then + info=575 + int_err(1)=3 + int_err(2)=nprocs + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else + do j=1,nprocs + if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then + info=580 + int_err(1)=3 + int_err(2)=prc_v(j) + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + end if + end do + endif + j=1 + do + if (j > nprocs) exit + if (prc_v(j) == me) exit + j=j+1 + enddo + + if (j <= nprocs) then + if (prc_v(j) == me) then + ! this point belongs to me + k = k + 1 + call psb_check_size((k+1),desc_a%loc_to_glob,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + desc_a%loc_to_glob(k) = i + call SearchInsKeyVal(desc_a%ptree,i,k,glx,info) + if (nprocs > 1) then + call psb_check_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') goto 9999 - endif + end if + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = i + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = nprocs + temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs) + itmpov = itmpov + nprocs endif - itmpov = itmpov + 1 - temp_ovrlap(itmpov) = i - itmpov = itmpov + 1 - temp_ovrlap(itmpov) = nprocs - temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs) - itmpov = itmpov + nprocs - endif - end if + end if + end if end if + enddo + if (info /= 0) then + info=4000 + call psb_errpush(info,name) + goto 9999 endif - enddo + loc_row = k + + else + + do i=1,m + if (info == 0) then + call parts(i,m,np,prc_v,nprocs) + if (nprocs > np) then + info=570 + int_err(1)=3 + int_err(2)=np + int_err(3)=nprocs + int_err(4)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else if (nprocs <= 0) then + info=575 + int_err(1)=3 + int_err(2)=nprocs + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + else + do j=1,nprocs + if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then + info=580 + int_err(1)=3 + int_err(2)=prc_v(j) + int_err(3)=i + err=info + call psb_errpush(err,name,int_err) + goto 9999 + end if + end do + endif + desc_a%glob_to_loc(i) = -(np+prc_v(1)+1) + j=1 + do + if (j > nprocs) exit + if (prc_v(j) == me) exit + j=j+1 + enddo + if (j <= nprocs) then + if (prc_v(j) == me) then + ! this point belongs to me + counter=counter+1 + desc_a%glob_to_loc(i) = counter + if (nprocs > 1) then + call psb_check_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = i + itmpov = itmpov + 1 + temp_ovrlap(itmpov) = nprocs + temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs) + itmpov = itmpov + nprocs + endif + end if + end if + endif + enddo + ! estimate local cols number + loc_row=counter + loc_col=min(2*loc_row,m) + + allocate(desc_a%loc_to_glob(loc_col),& + &desc_a%lprm(1),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if + + ! set LOC_TO_GLOB array to all "-1" values + desc_a%lprm(1) = 0 + desc_a%loc_to_glob(:) = -1 + do i=1,m + k = desc_a%glob_to_loc(i) + if (k > 0) then + desc_a%loc_to_glob(k) = i + endif + enddo + + end if - loc_row=counter ! check on parts function if (debug) write(*,*) 'PSB_CDALL: End main loop:' ,loc_row,itmpov,info @@ -227,9 +347,8 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info) allocate(ov_idx(l_ov_ix),ov_el(l_ov_el), stat=info) if (info /= no_err) then info=4010 - char_err='psb_realloc' err=info - call psb_errpush(err,name,a_err=char_err) + call psb_errpush(err,name,a_err='psb_realloc') goto 9999 end if @@ -260,51 +379,32 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info) call psb_transfer(ov_idx,desc_a%ovrlap_index,info) if (info == 0) call psb_transfer(ov_el,desc_a%ovrlap_elem,info) - deallocate(prc_v,temp_ovrlap,stat=info) + if (info == 0) deallocate(prc_v,temp_ovrlap,stat=info) if (info /= no_err) then info=4000 err=info call psb_errpush(err,name) Goto 9999 endif - ! estimate local cols number - loc_col=min(2*loc_row,m) - - allocate(desc_a%loc_to_glob(loc_col),& - &desc_a%lprm(1),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if - - ! set LOC_TO_GLOB array to all "-1" values - desc_a%lprm(1) = 0 - desc_a%loc_to_glob(:) = -1 - do i=1,m - k = desc_a%glob_to_loc(i) - if (k > 0) then - desc_a%loc_to_glob(k) = i - endif - enddo + ! At this point overlap_elem is OK. + desc_a%matrix_data(psb_ovl_state_) = psb_cd_ovl_asb_ ! set fields in desc_a%MATRIX_DATA.... desc_a%matrix_data(psb_n_row_) = loc_row desc_a%matrix_data(psb_n_col_) = loc_row + call psb_cd_set_bld(desc_a,info) call psb_realloc(1,desc_a%halo_index, info) if (info /= no_err) then info=2025 - char_err='psb_realloc' - call psb_errpush(err,name,a_err=char_err) + call psb_errpush(err,name,a_err='psb_realloc') Goto 9999 end if desc_a%halo_index(:) = -1 - desc_a%matrix_data(psb_m_) = m desc_a%matrix_data(psb_n_) = n - desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_ desc_a%matrix_data(psb_ctxt_) = ictxt call psb_get_mpicomm(ictxt,desc_a%matrix_data(psb_mpi_c_)) diff --git a/base/tools/psb_cdalv.f90 b/base/tools/psb_cdalv.f90 index a02790fe..0e9f3d5a 100644 --- a/base/tools/psb_cdalv.f90 +++ b/base/tools/psb_cdalv.f90 @@ -58,7 +58,7 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag) Integer :: counter,i,j,np,me,loc_row,err,& & loc_col,nprocs,m,n,itmpov, k,glx,gidx,gle,& & l_ov_ix,l_ov_el,idx, flag_, err_act - integer :: int_err(5),exch(2) + integer :: int_err(5),exch(3) Integer, allocatable :: temp_ovrlap(:), ov_idx(:),ov_el(:) logical, parameter :: debug=.false. character(len=20) :: name @@ -93,29 +93,27 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag) goto 9999 end if - if (debug) write(*,*) 'psb_cdall: doing global checks' - !global check on m and n parameters if (me == psb_root_) then exch(1)=m exch(2)=n - call psb_bcast(ictxt,exch(1:2),root=psb_root_) + exch(3)=psb_cd_get_large_threshold() + call psb_bcast(ictxt,exch(1:3),root=psb_root_) else - call psb_bcast(ictxt,exch(1:2),root=psb_root_) + call psb_bcast(ictxt,exch(1:3),root=psb_root_) if (exch(1) /= m) then - info=550 + err=550 int_err(1)=1 + call psb_errpush(err,name,int_err) + goto 9999 else if (exch(2) /= n) then - info=550 + err=550 int_err(1)=2 + call psb_errpush(err,name,int_err) + goto 9999 endif + call psb_cd_set_large_threshold(exch(3)) endif - if (info /= 0) then - call psb_errpush(info,name,i_err=int_err) - goto 9999 - end if - - call psb_nullify_desc(desc_a) if (present(flag)) then @@ -136,9 +134,11 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag) if (m > psb_cd_get_large_threshold()) then allocate(desc_a%matrix_data(psb_mdata_size_),& &temp_ovrlap(m),stat=info) + desc_a%matrix_data(psb_desc_size_) = psb_desc_large_ else allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& &temp_ovrlap(m),stat=info) + desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_ end if if (info /= 0) then info=2025 @@ -154,7 +154,6 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag) temp_ovrlap(:) = -1 if ( m >psb_cd_get_large_threshold()) then - desc_a%matrix_data(psb_dec_type_) = psb_desc_large_bld_ do i=1,m @@ -219,7 +218,6 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag) else - desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_ do i=1,m if (((v(i)-flag_) > np-1).or.((v(i)-flag_) < 0)) then @@ -336,13 +334,14 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag) ! set fields in desc_a%MATRIX_DATA.... desc_a%matrix_data(psb_n_row_) = loc_row desc_a%matrix_data(psb_n_col_) = loc_row + call psb_cd_set_bld(desc_a,info) - allocate(desc_a%halo_index(1),stat=info) - if (info /= 0) then - info=4000 - call psb_errpush(info,name) - goto 9999 - endif + call psb_realloc(1,desc_a%halo_index, info) + if (info /= no_err) then + info=2025 + call psb_errpush(err,name,a_err='psb_realloc') + Goto 9999 + end if desc_a%halo_index(:) = -1 diff --git a/base/tools/psb_cdasb.f90 b/base/tools/psb_cdasb.f90 index 762e3c42..ff543436 100644 --- a/base/tools/psb_cdasb.f90 +++ b/base/tools/psb_cdasb.f90 @@ -128,21 +128,14 @@ subroutine psb_cdasb(desc_a,info) call psb_errpush(info,name) goto 9999 end if - - if (psb_is_large_dec(dectype) ) then - desc_a%matrix_data(psb_dec_type_) = psb_desc_large_asb_ -!!$ write(0,*) 'Done large dec asmbly',desc_a%matrix_data(psb_dec_type_),& -!!$ & psb_desc_large_asb_,psb_is_asb_dec(desc_a%matrix_data(psb_dec_type_)) - else - ! Ok, register into MATRIX_DATA & free temporary work areas - desc_a%matrix_data(psb_dec_type_) = psb_desc_asb_ - endif + ! Ok, register into MATRIX_DATA & free temporary work areas + desc_a%matrix_data(psb_dec_type_) = psb_desc_asb_ else info = 600 call psb_errpush(info,name) goto 9999 - if (debug) write(0,*) 'dectype 2 :',dectype,psb_desc_bld_,& - &psb_desc_asb_,psb_desc_upd_ + if (debug) write(0,*) 'dectype 2 :',psb_cd_get_dectype(desc_a),& + &psb_desc_bld_,psb_desc_asb_,psb_desc_upd_ endif call psb_erractionrestore(err_act) diff --git a/base/tools/psb_cdrep.f90 b/base/tools/psb_cdrep.f90 index 7ed846b6..7af1d15e 100644 --- a/base/tools/psb_cdrep.f90 +++ b/base/tools/psb_cdrep.f90 @@ -184,6 +184,9 @@ subroutine psb_cdrep(m, ictxt, desc_a, info) call psb_errpush(info,name,i_err=int_err) goto 9999 endif + ! If the index space is replicated there's no point in having + ! the AVL tree structure.... + desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_ desc_a%matrix_data(psb_m_) = m diff --git a/base/tools/psb_dcdovr.f90 b/base/tools/psb_dcdovr.f90 index aa0d3aab..bc1ae7be 100644 --- a/base/tools/psb_dcdovr.f90 +++ b/base/tools/psb_dcdovr.f90 @@ -156,86 +156,168 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info) l_tmp_halo = novr*(3*Size(desc_a%halo_index)) if (psb_is_large_desc(desc_a)) then - desc_ov%matrix_data(psb_dec_type_) = psb_desc_large_bld_ + desc_ov%matrix_data(psb_desc_size_) = psb_desc_large_ else - desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_ + desc_ov%matrix_data(psb_desc_size_) = psb_desc_normal_ end if + call psb_cd_set_bld(desc_ov,info) +!!$ desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_ + If(debug) then Write(0,*)'Start cdovrbld',me,lworks,lworkr call psb_barrier(ictxt) endif - if (.false.) then - ! - ! The real work goes on in here.... - ! - Call psb_cdovrbld(novr,desc_ov,desc_a,a,& - & l_tmp_halo,l_tmp_ovr_idx,lworks,lworkr,info) - - if (info /= 0) then - info=4010 - ch_err='psb_cdovrbld' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - If(debug) then - Write(0,*)'Done cdovrbld',me,lworks,lworkr - call psb_barrier(ictxt) - endif + Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if - else + Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& + & t_halo_out(l_tmp_halo), temp(lworkr),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if - Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if + call psb_sp_all(blk,max(lworks,lworkr),info) + if (info /= 0) then + info=4010 + ch_err='psb_sp_all' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if - Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& - & t_halo_out(l_tmp_halo), temp(lworkr),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if + blk%fida='COO' + Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),& + & halo(size(desc_a%halo_index)),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if + halo(:) = desc_a%halo_index(:) + desc_ov%ovrlap_elem(:) = -1 + tmp_ovr_idx(:) = -1 + tmp_halo(:) = -1 + counter_e = 1 + tot_recv = 0 + counter_h = 1 + counter_o = 1 + + ! Init overlap with desc_a%ovrlap (if any) + counter = 1 + Do While (desc_a%ovrlap_index(counter) /= -1) + proc = desc_a%ovrlap_index(counter+psb_proc_id_) + n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_) + n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_) + + Do j=0,n_elem_recv-1 + + idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) + If(idx > Size(desc_ov%loc_to_glob)) then + info=-3 + call psb_errpush(info,name) + goto 9999 + endif + + gidx = desc_ov%loc_to_glob(idx) + + call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + + tmp_ovr_idx(counter_o)=proc + tmp_ovr_idx(counter_o+1)=1 + tmp_ovr_idx(counter_o+2)=gidx + tmp_ovr_idx(counter_o+3)=-1 + counter_o=counter_o+3 + end Do + counter=counter+n_elem_recv+n_elem_send+2 + end Do - call psb_sp_all(blk,max(lworks,lworkr),info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_all' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - blk%fida='COO' - Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),& - & halo(size(desc_a%halo_index)),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if - halo(:) = desc_a%halo_index(:) - desc_ov%ovrlap_elem(:) = -1 - tmp_ovr_idx(:) = -1 - tmp_halo(:) = -1 - counter_e = 1 - tot_recv = 0 - counter_h = 1 - counter_o = 1 - - ! Init overlap with desc_a%ovrlap (if any) - counter = 1 - Do While (desc_a%ovrlap_index(counter) /= -1) - proc = desc_a%ovrlap_index(counter+psb_proc_id_) - n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_) - n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_) + ! + ! A picture is in order to understand what goes on here. + ! I is the internal part; H is halo, R row, C column. The final + ! matrix with N levels of overlap looks like this + ! + ! I | Hc1 | 0 | 0 | + ! -------|-----|-----|-----| + ! Hr1 | Hd1 | Hc2 | 0 | + ! -------|-----|-----|-----| + ! 0 | Hr2 | Hd2 | Hc2 | + ! -------|-----|-----|-----| + ! 0 | 0 | Hr3 | Hd3 | Hc3 + ! + ! At the start we already have I and Hc1, so we know the row + ! indices that will make up Hr1, and also who owns them. As we + ! actually get those rows, we receive the column indices in Hc2; + ! these define the row indices for Hr2, and so on. When we have + ! reached the desired level HrN, we may ignore HcN. + ! + ! + Do i_ovr = 1, novr + + if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr + + ! + ! At this point, halo contains a valid halo corresponding to the + ! matrix enlarged with the elements in the frontier for I_OVR-1. + ! At the start, this is just the halo for A; the rows for indices in + ! the first halo will contain column indices defining the second halo + ! level and so on. + ! + bsdindx(:) = 0 + sdsz(:) = 0 + brvindx(:) = 0 + rvsz(:) = 0 + idxr = 0 + idxs = 0 + counter = 1 + counter_t = 1 + + + Do While (halo(counter) /= -1) + tot_elem=0 + proc=halo(counter+psb_proc_id_) + n_elem_recv=halo(counter+psb_n_elem_recv_) + n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_) + If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then + info = -1 + call psb_errpush(info,name) + goto 9999 + end If + tot_recv=tot_recv+n_elem_recv + if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv + ! + ! + ! The format of the halo vector exists in two forms: 1. Temporary + ! 2. Assembled. In this loop we are using the (assembled) halo_in and + ! copying it into (temporary) halo_out; this is because tmp_halo will + ! be enlarged with the new column indices received, and will reassemble + ! everything for the next iteration. + ! + + ! + ! add recv elements in halo_index into ovrlap_index + ! Do j=0,n_elem_recv-1 + If((counter+psb_elem_recv_+j)>Size(halo)) then + info=-2 + call psb_errpush(info,name) + goto 9999 + end If - idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) + idx = halo(counter+psb_elem_recv_+j) If(idx > Size(desc_ov%loc_to_glob)) then info=-3 call psb_errpush(info,name) @@ -256,442 +338,340 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info) tmp_ovr_idx(counter_o+2)=gidx tmp_ovr_idx(counter_o+3)=-1 counter_o=counter_o+3 - end Do - counter=counter+n_elem_recv+n_elem_send+2 - end Do - - + if (.not.psb_is_large_desc(desc_ov)) then + call psb_check_size((counter_h+3),tmp_halo,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + tmp_halo(counter_h)=proc + tmp_halo(counter_h+1)=1 + tmp_halo(counter_h+2)=idx + tmp_halo(counter_h+3)=-1 - ! - ! A picture is in order to understand what goes on here. - ! I is the internal part; H is halo, R row, C column. The final - ! matrix with N levels of overlap looks like this - ! - ! I | Hc1 | 0 | 0 | - ! -------|-----|-----|-----| - ! Hr1 | Hd1 | Hc2 | 0 | - ! -------|-----|-----|-----| - ! 0 | Hr2 | Hd2 | Hc2 | - ! -------|-----|-----|-----| - ! 0 | 0 | Hr3 | Hd3 | Hc3 - ! - ! At the start we already have I and Hc1, so we know the row - ! indices that will make up Hr1, and also who owns them. As we - ! actually get those rows, we receive the column indices in Hc2; - ! these define the row indices for Hr2, and so on. When we have - ! reached the desired level HrN, we may ignore HcN. - ! - ! - Do i_ovr = 1, novr + counter_h=counter_h+3 + end if - if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr + Enddo + if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10) + counter = counter+n_elem_recv ! - ! At this point, halo contains a valid halo corresponding to the - ! matrix enlarged with the elements in the frontier for I_OVR-1. - ! At the start, this is just the halo for A; the rows for indices in - ! the first halo will contain column indices defining the second halo - ! level and so on. + ! add send elements in halo_index into ovrlap_index ! - bsdindx(:) = 0 - sdsz(:) = 0 - brvindx(:) = 0 - rvsz(:) = 0 - idxr = 0 - idxs = 0 - counter = 1 - counter_t = 1 - - - Do While (halo(counter) /= -1) - tot_elem=0 - proc=halo(counter+psb_proc_id_) - n_elem_recv=halo(counter+psb_n_elem_recv_) - n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_) - If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then - info = -1 - call psb_errpush(info,name) + Do j=0,n_elem_send-1 + + idx = halo(counter+psb_elem_send_+j) + gidx = desc_ov%loc_to_glob(idx) + if (idx > psb_cd_get_local_rows(Desc_a)) & + & write(0,*) me,i_ovr,'Out of local rows ',& + & idx,psb_cd_get_local_rows(Desc_a) + + call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') goto 9999 - end If - tot_recv=tot_recv+n_elem_recv - if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv - ! - ! - ! The format of the halo vector exists in two forms: 1. Temporary - ! 2. Assembled. In this loop we are using the (assembled) halo_in and - ! copying it into (temporary) halo_out; this is because tmp_halo will - ! be enlarged with the new column indices received, and will reassemble - ! everything for the next iteration. - ! + end if + + tmp_ovr_idx(counter_o)=proc + tmp_ovr_idx(counter_o+1)=1 + tmp_ovr_idx(counter_o+2)=gidx + tmp_ovr_idx(counter_o+3)=-1 + counter_o=counter_o+3 ! - ! add recv elements in halo_index into ovrlap_index + ! Prepare to exchange the halo rows with the other proc. ! - Do j=0,n_elem_recv-1 - If((counter+psb_elem_recv_+j)>Size(halo)) then - info=-2 - call psb_errpush(info,name) - goto 9999 - end If + If (i_ovr < (novr)) Then + n_elem = psb_sp_get_nnz_row(idx,a) - idx = halo(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then - info=-3 - call psb_errpush(info,name) - goto 9999 - endif - - gidx = desc_ov%loc_to_glob(idx) - - call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + call psb_check_size((idxs+tot_elem+n_elem),works,info) if (info /= 0) then info=4010 call psb_errpush(info,name,a_err='psb_check_size') goto 9999 end if - tmp_ovr_idx(counter_o)=proc - tmp_ovr_idx(counter_o+1)=1 - tmp_ovr_idx(counter_o+2)=gidx - tmp_ovr_idx(counter_o+3)=-1 - counter_o=counter_o+3 - if (.not.psb_is_large_desc(desc_ov)) then - call psb_check_size((counter_h+3),tmp_halo,info,pad=-1) + If((n_elem) > size(blk%ia2)) Then + isz = max((3*size(blk%ia2))/2,(n_elem)) + if (debug) write(0,*) me,'Realloc blk',isz + call psb_sp_reall(blk,isz,info) if (info /= 0) then info=4010 - call psb_errpush(info,name,a_err='psb_check_size') + ch_err='psb_sp_reall' + call psb_errpush(info,name,a_err=ch_err) goto 9999 end if + End If - tmp_halo(counter_h)=proc - tmp_halo(counter_h+1)=1 - tmp_halo(counter_h+2)=idx - tmp_halo(counter_h+3)=-1 - - counter_h=counter_h+3 - end if - - Enddo - if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10) - counter = counter+n_elem_recv - - ! - ! add send elements in halo_index into ovrlap_index - ! - Do j=0,n_elem_send-1 - - idx = halo(counter+psb_elem_send_+j) - gidx = desc_ov%loc_to_glob(idx) - if (idx > psb_cd_get_local_rows(Desc_a)) & - & write(0,*) me,i_ovr,'Out of local rows ',& - & idx,psb_cd_get_local_rows(Desc_a) - - call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + call psb_sp_getblk(idx,a,blk,info) if (info /= 0) then info=4010 - call psb_errpush(info,name,a_err='psb_check_size') + ch_err='psb_sp_getblk' + call psb_errpush(info,name,a_err=ch_err) goto 9999 end if +!!$ write(0,*) me,'Iteration: ',j,i_ovr + Do jj=1,n_elem + works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) + End Do + tot_elem=tot_elem+n_elem + End If - tmp_ovr_idx(counter_o)=proc - tmp_ovr_idx(counter_o+1)=1 - tmp_ovr_idx(counter_o+2)=gidx - tmp_ovr_idx(counter_o+3)=-1 - counter_o=counter_o+3 - - ! - ! Prepare to exchange the halo rows with the other proc. - ! - If (i_ovr < (novr)) Then - n_elem = psb_sp_get_nnz_row(idx,a) + Enddo - call psb_check_size((idxs+tot_elem+n_elem),works,info) - if (info /= 0) then - info=4010 - call psb_errpush(info,name,a_err='psb_check_size') - goto 9999 - end if - If((n_elem) > size(blk%ia2)) Then - isz = max((3*size(blk%ia2))/2,(n_elem)) - if (debug) write(0,*) me,'Realloc blk',isz - call psb_sp_reall(blk,isz,info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_reall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - End If - - call psb_sp_getblk(idx,a,blk,info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_getblk' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 + if (i_ovr < novr) then + if (tot_elem > 1) then + call imsr(tot_elem,works(idxs+1)) + lx = works(idxs+1) + i = 1 + j = 1 + do + j = j + 1 + if (j > tot_elem) exit + if (works(idxs+j) /= lx) then + i = i + 1 + works(idxs+i) = works(idxs+j) + lx = works(idxs+i) end if -!!$ write(0,*) me,'Iteration: ',j,i_ovr - Do jj=1,n_elem - works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) - End Do - tot_elem=tot_elem+n_elem - End If - - Enddo - - - if (i_ovr < novr) then - if (tot_elem > 1) then - call imsr(tot_elem,works(idxs+1)) - lx = works(idxs+1) - i = 1 - j = 1 - do - j = j + 1 - if (j > tot_elem) exit - if (works(idxs+j) /= lx) then - i = i + 1 - works(idxs+i) = works(idxs+j) - lx = works(idxs+i) - end if - end do - tot_elem = i - endif - if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) - sdsz(proc+1) = tot_elem - idxs = idxs + tot_elem - end if + end do + tot_elem = i + endif + if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) + sdsz(proc+1) = tot_elem + idxs = idxs + tot_elem + end if + counter = counter+n_elem_send+3 + if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10) + Enddo + if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv + + if (i_ovr < novr) then + ! + ! Exchange data requests with everybody else: so far we have + ! accumulated RECV requests, we have an all-to-all to build + ! matchings SENDs. + ! + call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info) + if (info /= 0) then + info=4010 + ch_err='mpi_alltoall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=halo(counter) + if (proc == -1) exit + n_elem_recv = halo(counter+psb_n_elem_recv_) + counter = counter+n_elem_recv + n_elem_send = halo(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) counter = counter+n_elem_send+3 - if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10) Enddo - if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv - - if (i_ovr < novr) then - ! - ! Exchange data requests with everybody else: so far we have - ! accumulated RECV requests, we have an all-to-all to build - ! matchings SENDs. - ! - call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info) + + iszr=sum(rvsz) + if (max(iszr,1) > lworkr) then + call psb_realloc(max(iszr,1),workr,info) if (info /= 0) then info=4010 - ch_err='mpi_alltoall' + ch_err='psb_realloc' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - idxs = 0 - idxr = 0 - counter = 1 - Do - proc=halo(counter) - if (proc == -1) exit - n_elem_recv = halo(counter+psb_n_elem_recv_) - counter = counter+n_elem_recv - n_elem_send = halo(counter+psb_n_elem_send_) - - bsdindx(proc+1) = idxs - idxs = idxs + sdsz(proc+1) - brvindx(proc+1) = idxr - idxr = idxr + rvsz(proc+1) - counter = counter+n_elem_send+3 - Enddo - - iszr=sum(rvsz) - if (max(iszr,1) > lworkr) then - call psb_realloc(max(iszr,1),workr,info) - if (info /= 0) then - info=4010 - ch_err='psb_realloc' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - lworkr=max(iszr,1) - end if + lworkr=max(iszr,1) + end if + + call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& + & workr,rvsz,brvindx,mpi_integer,icomm,info) + if (info /= 0) then + info=4010 + ch_err='mpi_alltoallv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if - call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& - & workr,rvsz,brvindx,mpi_integer,icomm,info) + if (debug) write(0,*) 'ISZR :',iszr + + if (psb_is_large_desc(desc_a)) then + call psb_check_size(iszr,maskr,info) if (info /= 0) then info=4010 - ch_err='mpi_alltoallv' - call psb_errpush(info,name,a_err=ch_err) + call psb_errpush(info,name,a_err='psb_check_size') goto 9999 end if + call psi_idx_cnv(iszr,workr,maskr,desc_ov,info) + iszs = count(maskr(1:iszr)<=0) + if (iszs > size(works)) call psb_realloc(iszs,works,info) + j = 0 + do i=1,iszr + if (maskr(i) < 0) then + j=j+1 + works(j) = workr(i) + end if + end do + ! + ! fnd_owner on desc_a because we want the procs who + ! owned the rows from the beginning! + ! + call psi_fnd_owner(iszs,works,temp,desc_a,info) + n_col=psb_cd_get_local_cols(desc_ov) + + do i=1,iszs + idx = works(i) + n_col = psb_cd_get_local_cols(desc_ov) + call psi_idx_ins_cnv(idx,lidx,desc_ov,info) + if (psb_cd_get_local_cols(desc_ov) > n_col ) then + ! + ! This is a new index. Assigning a local index as + ! we receive them guarantees that all indices for HALO(I) + ! will be less than those for HALO(J) whenever I size(works)) call psb_realloc(iszs,works,info) - j = 0 - do i=1,iszr - if (maskr(i) < 0) then - j=j+1 - works(j) = workr(i) + call psb_check_size((counter_t+3),t_halo_in,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 end if - end do - ! - ! fnd_owner on desc_a because we want the procs who - ! owned the rows from the beginning! - ! - call psi_fnd_owner(iszs,works,temp,desc_a,info) - n_col=psb_cd_get_local_cols(desc_ov) - - do i=1,iszs - idx = works(i) - n_col = psb_cd_get_local_cols(desc_ov) - call psi_idx_ins_cnv(idx,lidx,desc_ov,info) - if (psb_cd_get_local_cols(desc_ov) > n_col ) then - ! - ! This is a new index. Assigning a local index as - ! we receive them guarantees that all indices for HALO(I) - ! will be less than those for HALO(J) whenever I0) then - lovr = ((nztot+m-1)/m)*nhalo*novr - lworks = ((nztot+m-1)/m)*nhalo - lworkr = ((nztot+m-1)/m)*nhalo + lovr = ((nztot+m-1)/m)*nhalo*novr + lworks = ((nztot+m-1)/m)*nhalo + lworkr = ((nztot+m-1)/m)*nhalo else info=-1 call psb_errpush(info,name) @@ -155,86 +155,168 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info) l_tmp_halo = novr*(3*Size(desc_a%halo_index)) if (psb_is_large_desc(desc_a)) then - desc_ov%matrix_data(psb_dec_type_) = psb_desc_large_bld_ + desc_ov%matrix_data(psb_desc_size_) = psb_desc_large_ else - desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_ + desc_ov%matrix_data(psb_desc_size_) = psb_desc_normal_ end if + call psb_cd_set_bld(desc_ov,info) +!!$ desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_ + If(debug) then Write(0,*)'Start cdovrbld',me,lworks,lworkr call psb_barrier(ictxt) endif - if (.false.) then - ! - ! The real work goes on in here.... - ! - Call psb_cdovrbld(novr,desc_ov,desc_a,a,& - & l_tmp_halo,l_tmp_ovr_idx,lworks,lworkr,info) - - if (info /= 0) then - info=4010 - ch_err='psb_cdovrbld' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - If(debug) then - Write(0,*)'Done cdovrbld',me,lworks,lworkr - call psb_barrier(ictxt) - endif + Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if - else + Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& + & t_halo_out(l_tmp_halo), temp(lworkr),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if - Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if + call psb_sp_all(blk,max(lworks,lworkr),info) + if (info /= 0) then + info=4010 + ch_err='psb_sp_all' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if - Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& - & t_halo_out(l_tmp_halo), temp(lworkr),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if + blk%fida='COO' + Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),& + & halo(size(desc_a%halo_index)),stat=info) + if (info /= 0) then + call psb_errpush(4010,name,a_err='Allocate') + goto 9999 + end if + halo(:) = desc_a%halo_index(:) + desc_ov%ovrlap_elem(:) = -1 + tmp_ovr_idx(:) = -1 + tmp_halo(:) = -1 + counter_e = 1 + tot_recv = 0 + counter_h = 1 + counter_o = 1 + + ! Init overlap with desc_a%ovrlap (if any) + counter = 1 + Do While (desc_a%ovrlap_index(counter) /= -1) + proc = desc_a%ovrlap_index(counter+psb_proc_id_) + n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_) + n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_) + + Do j=0,n_elem_recv-1 + + idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) + If(idx > Size(desc_ov%loc_to_glob)) then + info=-3 + call psb_errpush(info,name) + goto 9999 + endif + + gidx = desc_ov%loc_to_glob(idx) + + call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + + tmp_ovr_idx(counter_o)=proc + tmp_ovr_idx(counter_o+1)=1 + tmp_ovr_idx(counter_o+2)=gidx + tmp_ovr_idx(counter_o+3)=-1 + counter_o=counter_o+3 + end Do + counter=counter+n_elem_recv+n_elem_send+2 + end Do - call psb_sp_all(blk,max(lworks,lworkr),info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_all' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - blk%fida='COO' - Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),& - & halo(size(desc_a%halo_index)),stat=info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='Allocate') - goto 9999 - end if - halo(:) = desc_a%halo_index(:) - desc_ov%ovrlap_elem(:) = -1 - tmp_ovr_idx(:) = -1 - tmp_halo(:) = -1 - counter_e = 1 - tot_recv = 0 - counter_h = 1 - counter_o = 1 - - ! Init overlap with desc_a%ovrlap (if any) - counter = 1 - Do While (desc_a%ovrlap_index(counter) /= -1) - proc = desc_a%ovrlap_index(counter+psb_proc_id_) - n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_) - n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_) + ! + ! A picture is in order to understand what goes on here. + ! I is the internal part; H is halo, R row, C column. The final + ! matrix with N levels of overlap looks like this + ! + ! I | Hc1 | 0 | 0 | + ! -------|-----|-----|-----| + ! Hr1 | Hd1 | Hc2 | 0 | + ! -------|-----|-----|-----| + ! 0 | Hr2 | Hd2 | Hc2 | + ! -------|-----|-----|-----| + ! 0 | 0 | Hr3 | Hd3 | Hc3 + ! + ! At the start we already have I and Hc1, so we know the row + ! indices that will make up Hr1, and also who owns them. As we + ! actually get those rows, we receive the column indices in Hc2; + ! these define the row indices for Hr2, and so on. When we have + ! reached the desired level HrN, we may ignore HcN. + ! + ! + Do i_ovr = 1, novr + + if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr + + ! + ! At this point, halo contains a valid halo corresponding to the + ! matrix enlarged with the elements in the frontier for I_OVR-1. + ! At the start, this is just the halo for A; the rows for indices in + ! the first halo will contain column indices defining the second halo + ! level and so on. + ! + bsdindx(:) = 0 + sdsz(:) = 0 + brvindx(:) = 0 + rvsz(:) = 0 + idxr = 0 + idxs = 0 + counter = 1 + counter_t = 1 + + + Do While (halo(counter) /= -1) + tot_elem=0 + proc=halo(counter+psb_proc_id_) + n_elem_recv=halo(counter+psb_n_elem_recv_) + n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_) + If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then + info = -1 + call psb_errpush(info,name) + goto 9999 + end If + tot_recv=tot_recv+n_elem_recv + if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv + ! + ! + ! The format of the halo vector exists in two forms: 1. Temporary + ! 2. Assembled. In this loop we are using the (assembled) halo_in and + ! copying it into (temporary) halo_out; this is because tmp_halo will + ! be enlarged with the new column indices received, and will reassemble + ! everything for the next iteration. + ! + + ! + ! add recv elements in halo_index into ovrlap_index + ! Do j=0,n_elem_recv-1 + If((counter+psb_elem_recv_+j)>Size(halo)) then + info=-2 + call psb_errpush(info,name) + goto 9999 + end If - idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) + idx = halo(counter+psb_elem_recv_+j) If(idx > Size(desc_ov%loc_to_glob)) then info=-3 call psb_errpush(info,name) @@ -255,442 +337,340 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info) tmp_ovr_idx(counter_o+2)=gidx tmp_ovr_idx(counter_o+3)=-1 counter_o=counter_o+3 - end Do - counter=counter+n_elem_recv+n_elem_send+2 - end Do - - + if (.not.psb_is_large_desc(desc_ov)) then + call psb_check_size((counter_h+3),tmp_halo,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 + end if + tmp_halo(counter_h)=proc + tmp_halo(counter_h+1)=1 + tmp_halo(counter_h+2)=idx + tmp_halo(counter_h+3)=-1 - ! - ! A picture is in order to understand what goes on here. - ! I is the internal part; H is halo, R row, C column. The final - ! matrix with N levels of overlap looks like this - ! - ! I | Hc1 | 0 | 0 | - ! -------|-----|-----|-----| - ! Hr1 | Hd1 | Hc2 | 0 | - ! -------|-----|-----|-----| - ! 0 | Hr2 | Hd2 | Hc2 | - ! -------|-----|-----|-----| - ! 0 | 0 | Hr3 | Hd3 | Hc3 - ! - ! At the start we already have I and Hc1, so we know the row - ! indices that will make up Hr1, and also who owns them. As we - ! actually get those rows, we receive the column indices in Hc2; - ! these define the row indices for Hr2, and so on. When we have - ! reached the desired level HrN, we may ignore HcN. - ! - ! - Do i_ovr = 1, novr + counter_h=counter_h+3 + end if - if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr + Enddo + if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10) + counter = counter+n_elem_recv ! - ! At this point, halo contains a valid halo corresponding to the - ! matrix enlarged with the elements in the frontier for I_OVR-1. - ! At the start, this is just the halo for A; the rows for indices in - ! the first halo will contain column indices defining the second halo - ! level and so on. + ! add send elements in halo_index into ovrlap_index ! - bsdindx(:) = 0 - sdsz(:) = 0 - brvindx(:) = 0 - rvsz(:) = 0 - idxr = 0 - idxs = 0 - counter = 1 - counter_t = 1 - - - Do While (halo(counter) /= -1) - tot_elem=0 - proc=halo(counter+psb_proc_id_) - n_elem_recv=halo(counter+psb_n_elem_recv_) - n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_) - If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then - info = -1 - call psb_errpush(info,name) + Do j=0,n_elem_send-1 + + idx = halo(counter+psb_elem_send_+j) + gidx = desc_ov%loc_to_glob(idx) + if (idx > psb_cd_get_local_rows(Desc_a)) & + & write(0,*) me,i_ovr,'Out of local rows ',& + & idx,psb_cd_get_local_rows(Desc_a) + + call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') goto 9999 - end If - tot_recv=tot_recv+n_elem_recv - if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv - ! - ! - ! The format of the halo vector exists in two forms: 1. Temporary - ! 2. Assembled. In this loop we are using the (assembled) halo_in and - ! copying it into (temporary) halo_out; this is because tmp_halo will - ! be enlarged with the new column indices received, and will reassemble - ! everything for the next iteration. - ! + end if + + tmp_ovr_idx(counter_o)=proc + tmp_ovr_idx(counter_o+1)=1 + tmp_ovr_idx(counter_o+2)=gidx + tmp_ovr_idx(counter_o+3)=-1 + counter_o=counter_o+3 ! - ! add recv elements in halo_index into ovrlap_index + ! Prepare to exchange the halo rows with the other proc. ! - Do j=0,n_elem_recv-1 - If((counter+psb_elem_recv_+j)>Size(halo)) then - info=-2 - call psb_errpush(info,name) - goto 9999 - end If + If (i_ovr < (novr)) Then + n_elem = psb_sp_get_nnz_row(idx,a) - idx = halo(counter+psb_elem_recv_+j) - If(idx > Size(desc_ov%loc_to_glob)) then - info=-3 - call psb_errpush(info,name) - goto 9999 - endif - - gidx = desc_ov%loc_to_glob(idx) - - call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + call psb_check_size((idxs+tot_elem+n_elem),works,info) if (info /= 0) then info=4010 call psb_errpush(info,name,a_err='psb_check_size') goto 9999 end if - tmp_ovr_idx(counter_o)=proc - tmp_ovr_idx(counter_o+1)=1 - tmp_ovr_idx(counter_o+2)=gidx - tmp_ovr_idx(counter_o+3)=-1 - counter_o=counter_o+3 - if (.not.psb_is_large_desc(desc_ov)) then - call psb_check_size((counter_h+3),tmp_halo,info,pad=-1) + If((n_elem) > size(blk%ia2)) Then + isz = max((3*size(blk%ia2))/2,(n_elem)) + if (debug) write(0,*) me,'Realloc blk',isz + call psb_sp_reall(blk,isz,info) if (info /= 0) then info=4010 - call psb_errpush(info,name,a_err='psb_check_size') + ch_err='psb_sp_reall' + call psb_errpush(info,name,a_err=ch_err) goto 9999 end if + End If - tmp_halo(counter_h)=proc - tmp_halo(counter_h+1)=1 - tmp_halo(counter_h+2)=idx - tmp_halo(counter_h+3)=-1 - - counter_h=counter_h+3 - end if - - Enddo - if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10) - counter = counter+n_elem_recv - - ! - ! add send elements in halo_index into ovrlap_index - ! - Do j=0,n_elem_send-1 - - idx = halo(counter+psb_elem_send_+j) - gidx = desc_ov%loc_to_glob(idx) - if (idx > psb_cd_get_local_rows(Desc_a)) & - & write(0,*) me,i_ovr,'Out of local rows ',& - & idx,psb_cd_get_local_rows(Desc_a) - - call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1) + call psb_sp_getblk(idx,a,blk,info) if (info /= 0) then info=4010 - call psb_errpush(info,name,a_err='psb_check_size') + ch_err='psb_sp_getblk' + call psb_errpush(info,name,a_err=ch_err) goto 9999 end if +!!$ write(0,*) me,'Iteration: ',j,i_ovr + Do jj=1,n_elem + works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) + End Do + tot_elem=tot_elem+n_elem + End If - tmp_ovr_idx(counter_o)=proc - tmp_ovr_idx(counter_o+1)=1 - tmp_ovr_idx(counter_o+2)=gidx - tmp_ovr_idx(counter_o+3)=-1 - counter_o=counter_o+3 + Enddo - ! - ! Prepare to exchange the halo rows with the other proc. - ! - If (i_ovr < (novr)) Then - n_elem = psb_sp_get_nnz_row(idx,a) - call psb_check_size((idxs+tot_elem+n_elem),works,info) - if (info /= 0) then - info=4010 - call psb_errpush(info,name,a_err='psb_check_size') - goto 9999 + if (i_ovr < novr) then + if (tot_elem > 1) then + call imsr(tot_elem,works(idxs+1)) + lx = works(idxs+1) + i = 1 + j = 1 + do + j = j + 1 + if (j > tot_elem) exit + if (works(idxs+j) /= lx) then + i = i + 1 + works(idxs+i) = works(idxs+j) + lx = works(idxs+i) end if - - If((n_elem) > size(blk%ia2)) Then - isz = max((3*size(blk%ia2))/2,(n_elem)) - if (debug) write(0,*) me,'Realloc blk',isz - call psb_sp_reall(blk,isz,info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_reall' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - End If - - call psb_sp_getblk(idx,a,blk,info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_getblk' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if -!!$ write(0,*) me,'Iteration: ',j,i_ovr - Do jj=1,n_elem - works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) - End Do - tot_elem=tot_elem+n_elem - End If - - Enddo - - - if (i_ovr < novr) then - if (tot_elem > 1) then - call imsr(tot_elem,works(idxs+1)) - lx = works(idxs+1) - i = 1 - j = 1 - do - j = j + 1 - if (j > tot_elem) exit - if (works(idxs+j) /= lx) then - i = i + 1 - works(idxs+i) = works(idxs+j) - lx = works(idxs+i) - end if - end do - tot_elem = i - endif - if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) - sdsz(proc+1) = tot_elem - idxs = idxs + tot_elem - end if + end do + tot_elem = i + endif + if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) + sdsz(proc+1) = tot_elem + idxs = idxs + tot_elem + end if + counter = counter+n_elem_send+3 + if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10) + Enddo + if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv + + if (i_ovr < novr) then + ! + ! Exchange data requests with everybody else: so far we have + ! accumulated RECV requests, we have an all-to-all to build + ! matchings SENDs. + ! + call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info) + if (info /= 0) then + info=4010 + ch_err='mpi_alltoall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + idxs = 0 + idxr = 0 + counter = 1 + Do + proc=halo(counter) + if (proc == -1) exit + n_elem_recv = halo(counter+psb_n_elem_recv_) + counter = counter+n_elem_recv + n_elem_send = halo(counter+psb_n_elem_send_) + + bsdindx(proc+1) = idxs + idxs = idxs + sdsz(proc+1) + brvindx(proc+1) = idxr + idxr = idxr + rvsz(proc+1) counter = counter+n_elem_send+3 - if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10) Enddo - if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv - - if (i_ovr < novr) then - ! - ! Exchange data requests with everybody else: so far we have - ! accumulated RECV requests, we have an all-to-all to build - ! matchings SENDs. - ! - call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info) + + iszr=sum(rvsz) + if (max(iszr,1) > lworkr) then + call psb_realloc(max(iszr,1),workr,info) if (info /= 0) then info=4010 - ch_err='mpi_alltoall' + ch_err='psb_realloc' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - idxs = 0 - idxr = 0 - counter = 1 - Do - proc=halo(counter) - if (proc == -1) exit - n_elem_recv = halo(counter+psb_n_elem_recv_) - counter = counter+n_elem_recv - n_elem_send = halo(counter+psb_n_elem_send_) - - bsdindx(proc+1) = idxs - idxs = idxs + sdsz(proc+1) - brvindx(proc+1) = idxr - idxr = idxr + rvsz(proc+1) - counter = counter+n_elem_send+3 - Enddo - - iszr=sum(rvsz) - if (max(iszr,1) > lworkr) then - call psb_realloc(max(iszr,1),workr,info) - if (info /= 0) then - info=4010 - ch_err='psb_realloc' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if - lworkr=max(iszr,1) - end if + lworkr=max(iszr,1) + end if + + call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& + & workr,rvsz,brvindx,mpi_integer,icomm,info) + if (info /= 0) then + info=4010 + ch_err='mpi_alltoallv' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if - call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& - & workr,rvsz,brvindx,mpi_integer,icomm,info) + if (debug) write(0,*) 'ISZR :',iszr + + if (psb_is_large_desc(desc_a)) then + call psb_check_size(iszr,maskr,info) if (info /= 0) then info=4010 - ch_err='mpi_alltoallv' - call psb_errpush(info,name,a_err=ch_err) + call psb_errpush(info,name,a_err='psb_check_size') goto 9999 end if + call psi_idx_cnv(iszr,workr,maskr,desc_ov,info) + iszs = count(maskr(1:iszr)<=0) + if (iszs > size(works)) call psb_realloc(iszs,works,info) + j = 0 + do i=1,iszr + if (maskr(i) < 0) then + j=j+1 + works(j) = workr(i) + end if + end do + ! + ! fnd_owner on desc_a because we want the procs who + ! owned the rows from the beginning! + ! + call psi_fnd_owner(iszs,works,temp,desc_a,info) + n_col=psb_cd_get_local_cols(desc_ov) + + do i=1,iszs + idx = works(i) + n_col = psb_cd_get_local_cols(desc_ov) + call psi_idx_ins_cnv(idx,lidx,desc_ov,info) + if (psb_cd_get_local_cols(desc_ov) > n_col ) then + ! + ! This is a new index. Assigning a local index as + ! we receive them guarantees that all indices for HALO(I) + ! will be less than those for HALO(J) whenever I size(works)) call psb_realloc(iszs,works,info) - j = 0 - do i=1,iszr - if (maskr(i) < 0) then - j=j+1 - works(j) = workr(i) + desc_ov%glob_to_loc(idx)=n_col + desc_ov%loc_to_glob(n_col)=idx + + call psb_check_size((counter_t+3),t_halo_in,info,pad=-1) + if (info /= 0) then + info=4010 + call psb_errpush(info,name,a_err='psb_check_size') + goto 9999 end if - end do - ! - ! fnd_owner on desc_a because we want the procs who - ! owned the rows from the beginning! - ! - call psi_fnd_owner(iszs,works,temp,desc_a,info) - n_col=psb_cd_get_local_cols(desc_ov) - - do i=1,iszs - idx = works(i) - n_col = psb_cd_get_local_cols(desc_ov) - call psi_idx_ins_cnv(idx,lidx,desc_ov,info) - if (psb_cd_get_local_cols(desc_ov) > n_col ) then - ! - ! This is a new index. Assigning a local index as - ! we receive them guarantees that all indices for HALO(I) - ! will be less than those for HALO(J) whenever I