Fixed state vs. size of descriptor in two separate entries in matrix_data,

making them orthogonal.
Updated tools sources accordingly.
psblas3-type-indexed
Salvatore Filippone 20 years ago
parent 246bd82a93
commit ab8704dd91

@ -58,21 +58,24 @@ module psb_const_mod
! !
integer, parameter :: psb_dec_type_=1, psb_m_=2,psb_n_=3 integer, parameter :: psb_dec_type_=1, psb_m_=2,psb_n_=3
integer, parameter :: psb_n_row_=4, psb_n_col_=5,psb_ctxt_=6 integer, parameter :: psb_n_row_=4, psb_n_col_=5,psb_ctxt_=6
integer, parameter :: psb_loc_to_glob_=7 integer, parameter :: psb_desc_size_=7
integer, parameter :: psb_ovl_state_=8
integer, parameter :: psb_mpi_c_=9
integer, parameter :: psb_thal_xch_=11 integer, parameter :: psb_thal_xch_=11
integer, parameter :: psb_thal_snd_=12 integer, parameter :: psb_thal_snd_=12
integer, parameter :: psb_thal_rcv_=13 integer, parameter :: psb_thal_rcv_=13
integer, parameter :: psb_tovr_xch_=14 integer, parameter :: psb_tovr_xch_=14
integer, parameter :: psb_tovr_snd_=15 integer, parameter :: psb_tovr_snd_=15
integer, parameter :: psb_tovr_rcv_=16 integer, parameter :: psb_tovr_rcv_=16
integer, parameter :: psb_mpi_c_=9,psb_mdata_size_=20 integer, parameter :: psb_mdata_size_=20
integer, parameter :: psb_desc_asb_=3099 integer, parameter :: psb_desc_asb_=3099
integer, parameter :: psb_desc_bld_=psb_desc_asb_+1 integer, parameter :: psb_desc_bld_=psb_desc_asb_+1
integer, parameter :: psb_desc_repl_=3199 integer, parameter :: psb_desc_repl_=3199
integer, parameter :: psb_desc_upd_=psb_desc_bld_+1 integer, parameter :: psb_desc_upd_=psb_desc_bld_+1
integer, parameter :: psb_desc_upd_asb_=psb_desc_upd_+1 integer, parameter :: psb_desc_normal_=3299
integer, parameter :: psb_desc_large_asb_=psb_desc_upd_asb_+1 integer, parameter :: psb_desc_large_=psb_desc_normal_+1
integer, parameter :: psb_desc_large_bld_=psb_desc_large_asb_+1 integer, parameter :: psb_cd_ovl_bld_=3399
integer, parameter :: psb_cd_ovl_asb_=psb_cd_ovl_bld_+1
integer, parameter :: nbits=14 integer, parameter :: nbits=14
integer, parameter :: hashsize=2**nbits, hashmask=hashsize-1 integer, parameter :: hashsize=2**nbits, hashmask=hashsize-1
integer, parameter :: psb_default_large_threshold=4*1024*1024 ! to be reviewed integer, parameter :: psb_default_large_threshold=4*1024*1024 ! to be reviewed

@ -111,7 +111,7 @@ contains
logical function psb_is_large_desc(desc) logical function psb_is_large_desc(desc)
type(psb_desc_type), intent(in) :: desc type(psb_desc_type), intent(in) :: desc
psb_is_large_desc = psb_is_large_dec(psb_cd_get_dectype(desc)) psb_is_large_desc =(psb_desc_large_==psb_cd_get_size(desc))
end function psb_is_large_desc end function psb_is_large_desc
@ -122,12 +122,6 @@ contains
end function psb_is_upd_desc end function psb_is_upd_desc
logical function psb_is_asb_upd_desc(desc)
type(psb_desc_type), intent(in) :: desc
psb_is_asb_upd_desc = psb_is_asb_upd_dec(psb_cd_get_dectype(desc))
end function psb_is_asb_upd_desc
logical function psb_is_asb_desc(desc) logical function psb_is_asb_desc(desc)
type(psb_desc_type), intent(in) :: desc type(psb_desc_type), intent(in) :: desc
@ -141,16 +135,14 @@ contains
integer :: dectype integer :: dectype
psb_is_ok_dec = ((dectype == psb_desc_asb_).or.(dectype == psb_desc_bld_).or.& psb_is_ok_dec = ((dectype == psb_desc_asb_).or.(dectype == psb_desc_bld_).or.&
&(dectype == psb_desc_upd_).or.(dectype== psb_desc_upd_asb_).or.& &(dectype == psb_desc_upd_).or.&
&(dectype == psb_desc_large_asb_).or.(dectype == psb_desc_large_bld_).or.&
&(dectype== psb_desc_repl_)) &(dectype== psb_desc_repl_))
end function psb_is_ok_dec end function psb_is_ok_dec
logical function psb_is_bld_dec(dectype) logical function psb_is_bld_dec(dectype)
integer :: dectype integer :: dectype
psb_is_bld_dec = (dectype == psb_desc_bld_)& psb_is_bld_dec = (dectype == psb_desc_bld_)
& .or.(dectype == psb_desc_large_bld_)
end function psb_is_bld_dec end function psb_is_bld_dec
logical function psb_is_upd_dec(dectype) logical function psb_is_upd_dec(dectype)
@ -160,18 +152,11 @@ contains
end function psb_is_upd_dec end function psb_is_upd_dec
logical function psb_is_asb_upd_dec(dectype)
integer :: dectype
psb_is_asb_upd_dec = (dectype == psb_desc_upd_asb_)
end function psb_is_asb_upd_dec logical function psb_is_asb_dec(dectype)
logical function psb_is_asb_dec(dectype)
integer :: dectype integer :: dectype
psb_is_asb_dec = (dectype == psb_desc_asb_)& psb_is_asb_dec = (dectype == psb_desc_asb_).or.&
& .or.(dectype == psb_desc_large_asb_).or.&
& (dectype== psb_desc_repl_) & (dectype== psb_desc_repl_)
end function psb_is_asb_dec end function psb_is_asb_dec
@ -213,19 +198,79 @@ contains
psb_cd_get_dectype = desc%matrix_data(psb_dec_type_) psb_cd_get_dectype = desc%matrix_data(psb_dec_type_)
end function psb_cd_get_dectype end function psb_cd_get_dectype
integer function psb_cd_get_size(desc)
type(psb_desc_type), intent(in) :: desc
psb_cd_get_size = desc%matrix_data(psb_desc_size_)
end function psb_cd_get_size
integer function psb_cd_get_mpic(desc) integer function psb_cd_get_mpic(desc)
type(psb_desc_type), intent(in) :: desc type(psb_desc_type), intent(in) :: desc
psb_cd_get_mpic = desc%matrix_data(psb_mpi_c_) psb_cd_get_mpic = desc%matrix_data(psb_mpi_c_)
end function psb_cd_get_mpic end function psb_cd_get_mpic
logical function psb_is_large_dec(dectype)
integer :: dectype
psb_is_large_dec = (dectype == psb_desc_large_asb_)&
& .or.(dectype == psb_desc_large_bld_)
end function psb_is_large_dec subroutine psb_cd_set_bld(desc,info)
!
! Change state of a descriptor into BUILD.
! If the descriptor is LARGE, check the AVL search tree
! and initialize it if necessary.
!
use psb_const_mod
use psb_error_mod
use psb_penv_mod
implicit none
type(psb_desc_type), intent(inout) :: desc
integer :: info
!locals
integer :: np,me,ictxt, isz, err_act,idx,gidx,lidx
logical, parameter :: debug=.false.,debugprt=.false.
character(len=20) :: name, char_err
if (debug) write(0,*) me,'Entered CDCPY'
if (psb_get_errstatus() /= 0) return
info = 0
call psb_erractionsave(err_act)
name = 'psb_cd_set_bld'
ictxt = psb_cd_get_context(desc)
! check on blacs grid
call psb_info(ictxt, me, np)
if (debug) write(0,*) me,'Entered CDCPY'
if (psb_is_large_desc(desc)) then
if (.not.allocated(desc%ptree)) then
allocate(desc%ptree(2),stat=info)
if (info /= 0) then
info=4000
goto 9999
endif
call InitPairSearchTree(desc%ptree,info)
do idx=1, psb_cd_get_local_cols(desc)
gidx = desc%loc_to_glob(idx)
call SearchInsKeyVal(desc%ptree,gidx,idx,lidx,info)
if (lidx /= idx) then
write(0,*) 'Warning from cdset: mismatch in PTREE ',idx,lidx
endif
enddo
end if
end if
desc%matrix_data(psb_dec_type_) = psb_desc_bld_
call psb_erractionrestore(err_act)
return
9999 continue
call psb_erractionrestore(err_act)
if (err_act == act_ret) then
return
else
call psb_error(ictxt)
end if
return
end subroutine psb_cd_set_bld
end module psb_descriptor_type end module psb_descriptor_type

@ -706,7 +706,7 @@ contains
call psb_sum(ictxt,itmpsz) call psb_sum(ictxt,itmpsz)
nlp=0 nlp=0
do i=0, me-1 do i=0, me-1
nlp = nlp + itmpsz(me) nlp = nlp + itmpsz(i)
end do end do
call psb_cd_inloc((/(i,i=nlp+1,nlp+nl)/),ictxt,desc_a,info) call psb_cd_inloc((/(i,i=nlp+1,nlp+nl)/),ictxt,desc_a,info)

@ -56,7 +56,7 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
Integer :: counter,i,j,np,me,loc_row,err,& Integer :: counter,i,j,np,me,loc_row,err,&
& loc_col,nprocs,n,itmpov, k,glx,gidx,gle,& & loc_col,nprocs,n,itmpov, k,glx,gidx,gle,&
& l_ov_ix,l_ov_el,idx, flag_, err_act,m & l_ov_ix,l_ov_el,idx, flag_, err_act,m
integer :: int_err(5),exch(2) integer :: int_err(5),exch(3)
Integer, allocatable :: temp_ovrlap(:), ov_idx(:),ov_el(:),tmpgidx(:,:) Integer, allocatable :: temp_ovrlap(:), ov_idx(:),ov_el(:),tmpgidx(:,:)
logical, parameter :: debug=.false. logical, parameter :: debug=.false.
character(len=20) :: name character(len=20) :: name
@ -64,16 +64,16 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
if(psb_get_errstatus() /= 0) return if(psb_get_errstatus() /= 0) return
info=0 info=0
err=0 err=0
name = 'psb_cdalv' name = 'psb_cd_inloc'
call psb_info(ictxt, me, np) call psb_info(ictxt, me, np)
if (debug) write(*,*) 'psb_cdall: ',np,me if (debug) write(*,*) 'psb_cdall: ',np,me
loc_row=size(v) loc_row = size(v)
m = loc_row
m = loc_row
call psb_sum(ictxt,m) call psb_sum(ictxt,m)
n = m n = m
!... check m and n parameters.... !... check m and n parameters....
if (m < 1) then if (m < 1) then
@ -90,6 +90,26 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
call psb_errpush(info,name,i_err=int_err) call psb_errpush(info,name,i_err=int_err)
goto 9999 goto 9999
end if end if
if (me == psb_root_) then
exch(1)=m
exch(2)=n
exch(3)=psb_cd_get_large_threshold()
call psb_bcast(ictxt,exch(1:3),root=psb_root_)
else
call psb_bcast(ictxt,exch(1:3),root=psb_root_)
if (exch(1) /= m) then
err=550
int_err(1)=1
call psb_errpush(err,name,int_err)
goto 9999
else if (exch(2) /= n) then
err=550
int_err(1)=2
call psb_errpush(err,name,int_err)
goto 9999
endif
call psb_cd_set_large_threshold(exch(3))
endif
if (debug) write(*,*) 'psb_cdall: doing global checks' if (debug) write(*,*) 'psb_cdall: doing global checks'
allocate(tmpgidx(m,2),stat=info) allocate(tmpgidx(m,2),stat=info)
@ -98,18 +118,30 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
call psb_errpush(info,name,i_err=int_err) call psb_errpush(info,name,i_err=int_err)
goto 9999 goto 9999
end if end if
tmpgidx=0 tmpgidx=0
flag_=1
do i=1,loc_row do i=1,loc_row
if ((v(i)<1).or.(v(i)>m)) then if ((v(i)<1).or.(v(i)>m)) then
info = 551 info = 551
int_err(1) = i
int_err(2) = v(i)
int_err(3) = loc_row
int_err(4) = m
else else
tmpgidx(v(i),1) = me tmpgidx(v(i),1) = me+flag_
tmpgidx(v(i),2) = 1 tmpgidx(v(i),2) = 1
endif endif
end do end do
call psb_amx(ictxt,tmpgidx) call psb_amx(ictxt,tmpgidx)
if (count(tmpgidx(:,2) == 0)>0) then if (info ==0) then
info = 552 int_err(1) = count(tmpgidx(:,2) == 0)
int_err(2) = m
if (int_err(1)>0) then
info = 552
end if
end if end if
if (info /= 0) then if (info /= 0) then
@ -120,16 +152,17 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
call psb_nullify_desc(desc_a) call psb_nullify_desc(desc_a)
flag_=0
!count local rows number !count local rows number
! allocate work vector ! allocate work vector
if (m > psb_cd_get_large_threshold()) then if (m > psb_cd_get_large_threshold()) then
allocate(desc_a%matrix_data(psb_mdata_size_),& allocate(desc_a%matrix_data(psb_mdata_size_),&
&temp_ovrlap(m),stat=info) &temp_ovrlap(m),stat=info)
desc_a%matrix_data(psb_desc_size_) = psb_desc_large_
else else
allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),&
&temp_ovrlap(m),stat=info) &temp_ovrlap(m),stat=info)
desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_
end if end if
if (info /= 0) then if (info /= 0) then
info=2025 info=2025
@ -145,7 +178,6 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
temp_ovrlap(:) = -1 temp_ovrlap(:) = -1
if (m > psb_cd_get_large_threshold()) then if (m > psb_cd_get_large_threshold()) then
desc_a%matrix_data(psb_dec_type_) = psb_desc_large_bld_
do i=1,m do i=1,m
@ -210,7 +242,6 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
else else
desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_
do i=1,m do i=1,m
if (((tmpgidx(i,1)-flag_) > np-1).or.((tmpgidx(i,1)-flag_) < 0)) then if (((tmpgidx(i,1)-flag_) > np-1).or.((tmpgidx(i,1)-flag_) < 0)) then
@ -327,13 +358,14 @@ subroutine psb_cd_inloc(v, ictxt, desc_a, info)
! set fields in desc_a%MATRIX_DATA.... ! set fields in desc_a%MATRIX_DATA....
desc_a%matrix_data(psb_n_row_) = loc_row desc_a%matrix_data(psb_n_row_) = loc_row
desc_a%matrix_data(psb_n_col_) = loc_row desc_a%matrix_data(psb_n_col_) = loc_row
call psb_cd_set_bld(desc_a,info)
allocate(desc_a%halo_index(1),stat=info) call psb_realloc(1,desc_a%halo_index, info)
if (info /= 0) then if (info /= no_err) then
info=4000 info=2025
call psb_errpush(info,name) call psb_errpush(err,name,a_err='psb_realloc')
goto 9999 Goto 9999
endif end if
desc_a%halo_index(:) = -1 desc_a%halo_index(:) = -1

@ -57,11 +57,11 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info)
!locals !locals
Integer :: counter,i,j,np,me,loc_row,err,loc_col,nprocs,& Integer :: counter,i,j,np,me,loc_row,err,loc_col,nprocs,&
& l_ov_ix,l_ov_el,idx, err_act, itmpov, k, ns & l_ov_ix,l_ov_el,idx, err_act, itmpov, k, ns, glx, mth
integer :: int_err(5),exch(2) integer :: int_err(5),exch(3)
integer, allocatable :: prc_v(:), temp_ovrlap(:), ov_idx(:),ov_el(:) integer, allocatable :: prc_v(:), temp_ovrlap(:), ov_idx(:),ov_el(:)
logical, parameter :: debug=.false. logical, parameter :: debug=.false.
character(len=20) :: name, char_err character(len=20) :: name
if(psb_get_errstatus() /= 0) return if(psb_get_errstatus() /= 0) return
info=0 info=0
@ -96,9 +96,10 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info)
if (me == psb_root_) then if (me == psb_root_) then
exch(1)=m exch(1)=m
exch(2)=n exch(2)=n
call psb_bcast(ictxt,exch(1:2),root=psb_root_) exch(3)=psb_cd_get_large_threshold()
call psb_bcast(ictxt,exch(1:3),root=psb_root_)
else else
call psb_bcast(ictxt,exch(1:2),root=psb_root_) call psb_bcast(ictxt,exch(1:3),root=psb_root_)
if (exch(1) /= m) then if (exch(1) /= m) then
err=550 err=550
int_err(1)=1 int_err(1)=1
@ -110,15 +111,23 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info)
call psb_errpush(err,name,int_err) call psb_errpush(err,name,int_err)
goto 9999 goto 9999
endif endif
call psb_cd_set_large_threshold(exch(3))
endif endif
call psb_nullify_desc(desc_a) call psb_nullify_desc(desc_a)
!count local rows number !count local rows number
! allocate work vector ! allocate work vector
allocate(prc_v(np),desc_a%glob_to_loc(m),& if (m > psb_cd_get_large_threshold()) then
&desc_a%matrix_data(psb_mdata_size_),temp_ovrlap(m),stat=info) allocate(desc_a%matrix_data(psb_mdata_size_),&
if (info /= no_err) then & temp_ovrlap(m),prc_v(np),stat=info)
desc_a%matrix_data(psb_desc_size_) = psb_desc_large_
else
allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),&
& temp_ovrlap(m),prc_v(np),stat=info)
desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_
end if
if (info /= 0) then
info=2025 info=2025
err=info err=info
int_err(1)=m int_err(1)=m
@ -131,76 +140,187 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info)
counter = 0 counter = 0
itmpov = 0 itmpov = 0
temp_ovrlap(:) = -1 temp_ovrlap(:) = -1
do i=1,m if ( m >psb_cd_get_large_threshold()) then
if (info == 0) then loc_col = (m+np-1)/np
call parts(i,m,np,prc_v,nprocs) allocate(desc_a%loc_to_glob(loc_col), desc_a%lprm(1),&
if (nprocs > np) then & desc_a%ptree(2),stat=info)
info=570 if (info == 0) call InitPairSearchTree(desc_a%ptree,info)
int_err(1)=3 if (info /= 0) then
int_err(2)=np info=2025
int_err(3)=nprocs int_err(1)=loc_col
int_err(4)=i call psb_errpush(info,name,i_err=int_err)
err=info goto 9999
call psb_errpush(err,name,int_err) end if
goto 9999
else if (nprocs <= 0) then ! set LOC_TO_GLOB array to all "-1" values
info=575 desc_a%lprm(1) = 0
int_err(1)=3 desc_a%loc_to_glob(:) = -1
int_err(2)=nprocs k = 0
int_err(3)=i do i=1,m
err=info if (info == 0) then
call psb_errpush(err,name,int_err) call parts(i,m,np,prc_v,nprocs)
goto 9999 if (nprocs > np) then
else info=570
do j=1,nprocs int_err(1)=3
if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then int_err(2)=np
info=580 int_err(3)=nprocs
int_err(1)=3 int_err(4)=i
int_err(2)=prc_v(j) err=info
int_err(3)=i call psb_errpush(err,name,int_err)
err=info goto 9999
call psb_errpush(err,name,int_err) else if (nprocs <= 0) then
goto 9999 info=575
end if int_err(1)=3
end do int_err(2)=nprocs
endif int_err(3)=i
desc_a%glob_to_loc(i) = -(np+prc_v(1)+1) err=info
j=1 call psb_errpush(err,name,int_err)
do goto 9999
if (j > nprocs) exit else
if (prc_v(j) == me) exit do j=1,nprocs
j=j+1 if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then
enddo info=580
if (j <= nprocs) then int_err(1)=3
if (prc_v(j) == me) then int_err(2)=prc_v(j)
! this point belongs to me int_err(3)=i
counter=counter+1 err=info
desc_a%glob_to_loc(i) = counter call psb_errpush(err,name,int_err)
if (nprocs > 1) then goto 9999
if ((itmpov+2+nprocs) > size(temp_ovrlap)) then end if
ns = max(itmpov+2+nprocs,int(1.25*size(temp_ovrlap))) end do
call psb_realloc(ns,temp_ovrlap,info,pad=-1) endif
if (info /= 0) then j=1
info=2025 do
int_err(1)=m if (j > nprocs) exit
err=info if (prc_v(j) == me) exit
call psb_errpush(err,name,int_err) j=j+1
enddo
if (j <= nprocs) then
if (prc_v(j) == me) then
! this point belongs to me
k = k + 1
call psb_check_size((k+1),desc_a%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
desc_a%loc_to_glob(k) = i
call SearchInsKeyVal(desc_a%ptree,i,k,glx,info)
if (nprocs > 1) then
call psb_check_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999 goto 9999
endif end if
itmpov = itmpov + 1
temp_ovrlap(itmpov) = i
itmpov = itmpov + 1
temp_ovrlap(itmpov) = nprocs
temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs)
itmpov = itmpov + nprocs
endif endif
itmpov = itmpov + 1 end if
temp_ovrlap(itmpov) = i end if
itmpov = itmpov + 1
temp_ovrlap(itmpov) = nprocs
temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs)
itmpov = itmpov + nprocs
endif
end if
end if end if
enddo
if (info /= 0) then
info=4000
call psb_errpush(info,name)
goto 9999
endif endif
enddo loc_row = k
else
do i=1,m
if (info == 0) then
call parts(i,m,np,prc_v,nprocs)
if (nprocs > np) then
info=570
int_err(1)=3
int_err(2)=np
int_err(3)=nprocs
int_err(4)=i
err=info
call psb_errpush(err,name,int_err)
goto 9999
else if (nprocs <= 0) then
info=575
int_err(1)=3
int_err(2)=nprocs
int_err(3)=i
err=info
call psb_errpush(err,name,int_err)
goto 9999
else
do j=1,nprocs
if ((prc_v(j) > np-1).or.(prc_v(j) < 0)) then
info=580
int_err(1)=3
int_err(2)=prc_v(j)
int_err(3)=i
err=info
call psb_errpush(err,name,int_err)
goto 9999
end if
end do
endif
desc_a%glob_to_loc(i) = -(np+prc_v(1)+1)
j=1
do
if (j > nprocs) exit
if (prc_v(j) == me) exit
j=j+1
enddo
if (j <= nprocs) then
if (prc_v(j) == me) then
! this point belongs to me
counter=counter+1
desc_a%glob_to_loc(i) = counter
if (nprocs > 1) then
call psb_check_size((itmpov+3+nprocs),temp_ovrlap,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
itmpov = itmpov + 1
temp_ovrlap(itmpov) = i
itmpov = itmpov + 1
temp_ovrlap(itmpov) = nprocs
temp_ovrlap(itmpov+1:itmpov+nprocs) = prc_v(1:nprocs)
itmpov = itmpov + nprocs
endif
end if
end if
endif
enddo
! estimate local cols number
loc_row=counter
loc_col=min(2*loc_row,m)
allocate(desc_a%loc_to_glob(loc_col),&
&desc_a%lprm(1),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
! set LOC_TO_GLOB array to all "-1" values
desc_a%lprm(1) = 0
desc_a%loc_to_glob(:) = -1
do i=1,m
k = desc_a%glob_to_loc(i)
if (k > 0) then
desc_a%loc_to_glob(k) = i
endif
enddo
end if
loc_row=counter
! check on parts function ! check on parts function
if (debug) write(*,*) 'PSB_CDALL: End main loop:' ,loc_row,itmpov,info if (debug) write(*,*) 'PSB_CDALL: End main loop:' ,loc_row,itmpov,info
@ -227,9 +347,8 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info)
allocate(ov_idx(l_ov_ix),ov_el(l_ov_el), stat=info) allocate(ov_idx(l_ov_ix),ov_el(l_ov_el), stat=info)
if (info /= no_err) then if (info /= no_err) then
info=4010 info=4010
char_err='psb_realloc'
err=info err=info
call psb_errpush(err,name,a_err=char_err) call psb_errpush(err,name,a_err='psb_realloc')
goto 9999 goto 9999
end if end if
@ -260,51 +379,32 @@ subroutine psb_cdals(m, n, parts, ictxt, desc_a, info)
call psb_transfer(ov_idx,desc_a%ovrlap_index,info) call psb_transfer(ov_idx,desc_a%ovrlap_index,info)
if (info == 0) call psb_transfer(ov_el,desc_a%ovrlap_elem,info) if (info == 0) call psb_transfer(ov_el,desc_a%ovrlap_elem,info)
deallocate(prc_v,temp_ovrlap,stat=info) if (info == 0) deallocate(prc_v,temp_ovrlap,stat=info)
if (info /= no_err) then if (info /= no_err) then
info=4000 info=4000
err=info err=info
call psb_errpush(err,name) call psb_errpush(err,name)
Goto 9999 Goto 9999
endif endif
! estimate local cols number ! At this point overlap_elem is OK.
loc_col=min(2*loc_row,m) desc_a%matrix_data(psb_ovl_state_) = psb_cd_ovl_asb_
allocate(desc_a%loc_to_glob(loc_col),&
&desc_a%lprm(1),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
! set LOC_TO_GLOB array to all "-1" values
desc_a%lprm(1) = 0
desc_a%loc_to_glob(:) = -1
do i=1,m
k = desc_a%glob_to_loc(i)
if (k > 0) then
desc_a%loc_to_glob(k) = i
endif
enddo
! set fields in desc_a%MATRIX_DATA.... ! set fields in desc_a%MATRIX_DATA....
desc_a%matrix_data(psb_n_row_) = loc_row desc_a%matrix_data(psb_n_row_) = loc_row
desc_a%matrix_data(psb_n_col_) = loc_row desc_a%matrix_data(psb_n_col_) = loc_row
call psb_cd_set_bld(desc_a,info)
call psb_realloc(1,desc_a%halo_index, info) call psb_realloc(1,desc_a%halo_index, info)
if (info /= no_err) then if (info /= no_err) then
info=2025 info=2025
char_err='psb_realloc' call psb_errpush(err,name,a_err='psb_realloc')
call psb_errpush(err,name,a_err=char_err)
Goto 9999 Goto 9999
end if end if
desc_a%halo_index(:) = -1 desc_a%halo_index(:) = -1
desc_a%matrix_data(psb_m_) = m desc_a%matrix_data(psb_m_) = m
desc_a%matrix_data(psb_n_) = n desc_a%matrix_data(psb_n_) = n
desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_
desc_a%matrix_data(psb_ctxt_) = ictxt desc_a%matrix_data(psb_ctxt_) = ictxt
call psb_get_mpicomm(ictxt,desc_a%matrix_data(psb_mpi_c_)) call psb_get_mpicomm(ictxt,desc_a%matrix_data(psb_mpi_c_))

@ -58,7 +58,7 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag)
Integer :: counter,i,j,np,me,loc_row,err,& Integer :: counter,i,j,np,me,loc_row,err,&
& loc_col,nprocs,m,n,itmpov, k,glx,gidx,gle,& & loc_col,nprocs,m,n,itmpov, k,glx,gidx,gle,&
& l_ov_ix,l_ov_el,idx, flag_, err_act & l_ov_ix,l_ov_el,idx, flag_, err_act
integer :: int_err(5),exch(2) integer :: int_err(5),exch(3)
Integer, allocatable :: temp_ovrlap(:), ov_idx(:),ov_el(:) Integer, allocatable :: temp_ovrlap(:), ov_idx(:),ov_el(:)
logical, parameter :: debug=.false. logical, parameter :: debug=.false.
character(len=20) :: name character(len=20) :: name
@ -93,29 +93,27 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag)
goto 9999 goto 9999
end if end if
if (debug) write(*,*) 'psb_cdall: doing global checks'
!global check on m and n parameters
if (me == psb_root_) then if (me == psb_root_) then
exch(1)=m exch(1)=m
exch(2)=n exch(2)=n
call psb_bcast(ictxt,exch(1:2),root=psb_root_) exch(3)=psb_cd_get_large_threshold()
call psb_bcast(ictxt,exch(1:3),root=psb_root_)
else else
call psb_bcast(ictxt,exch(1:2),root=psb_root_) call psb_bcast(ictxt,exch(1:3),root=psb_root_)
if (exch(1) /= m) then if (exch(1) /= m) then
info=550 err=550
int_err(1)=1 int_err(1)=1
call psb_errpush(err,name,int_err)
goto 9999
else if (exch(2) /= n) then else if (exch(2) /= n) then
info=550 err=550
int_err(1)=2 int_err(1)=2
call psb_errpush(err,name,int_err)
goto 9999
endif endif
call psb_cd_set_large_threshold(exch(3))
endif endif
if (info /= 0) then
call psb_errpush(info,name,i_err=int_err)
goto 9999
end if
call psb_nullify_desc(desc_a) call psb_nullify_desc(desc_a)
if (present(flag)) then if (present(flag)) then
@ -136,9 +134,11 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag)
if (m > psb_cd_get_large_threshold()) then if (m > psb_cd_get_large_threshold()) then
allocate(desc_a%matrix_data(psb_mdata_size_),& allocate(desc_a%matrix_data(psb_mdata_size_),&
&temp_ovrlap(m),stat=info) &temp_ovrlap(m),stat=info)
desc_a%matrix_data(psb_desc_size_) = psb_desc_large_
else else
allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),& allocate(desc_a%glob_to_loc(m),desc_a%matrix_data(psb_mdata_size_),&
&temp_ovrlap(m),stat=info) &temp_ovrlap(m),stat=info)
desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_
end if end if
if (info /= 0) then if (info /= 0) then
info=2025 info=2025
@ -154,7 +154,6 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag)
temp_ovrlap(:) = -1 temp_ovrlap(:) = -1
if ( m >psb_cd_get_large_threshold()) then if ( m >psb_cd_get_large_threshold()) then
desc_a%matrix_data(psb_dec_type_) = psb_desc_large_bld_
do i=1,m do i=1,m
@ -219,7 +218,6 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag)
else else
desc_a%matrix_data(psb_dec_type_) = psb_desc_bld_
do i=1,m do i=1,m
if (((v(i)-flag_) > np-1).or.((v(i)-flag_) < 0)) then if (((v(i)-flag_) > np-1).or.((v(i)-flag_) < 0)) then
@ -336,13 +334,14 @@ subroutine psb_cdalv(v, ictxt, desc_a, info, flag)
! set fields in desc_a%MATRIX_DATA.... ! set fields in desc_a%MATRIX_DATA....
desc_a%matrix_data(psb_n_row_) = loc_row desc_a%matrix_data(psb_n_row_) = loc_row
desc_a%matrix_data(psb_n_col_) = loc_row desc_a%matrix_data(psb_n_col_) = loc_row
call psb_cd_set_bld(desc_a,info)
allocate(desc_a%halo_index(1),stat=info) call psb_realloc(1,desc_a%halo_index, info)
if (info /= 0) then if (info /= no_err) then
info=4000 info=2025
call psb_errpush(info,name) call psb_errpush(err,name,a_err='psb_realloc')
goto 9999 Goto 9999
endif end if
desc_a%halo_index(:) = -1 desc_a%halo_index(:) = -1

@ -128,21 +128,14 @@ subroutine psb_cdasb(desc_a,info)
call psb_errpush(info,name) call psb_errpush(info,name)
goto 9999 goto 9999
end if end if
! Ok, register into MATRIX_DATA & free temporary work areas
if (psb_is_large_dec(dectype) ) then desc_a%matrix_data(psb_dec_type_) = psb_desc_asb_
desc_a%matrix_data(psb_dec_type_) = psb_desc_large_asb_
!!$ write(0,*) 'Done large dec asmbly',desc_a%matrix_data(psb_dec_type_),&
!!$ & psb_desc_large_asb_,psb_is_asb_dec(desc_a%matrix_data(psb_dec_type_))
else
! Ok, register into MATRIX_DATA & free temporary work areas
desc_a%matrix_data(psb_dec_type_) = psb_desc_asb_
endif
else else
info = 600 info = 600
call psb_errpush(info,name) call psb_errpush(info,name)
goto 9999 goto 9999
if (debug) write(0,*) 'dectype 2 :',dectype,psb_desc_bld_,& if (debug) write(0,*) 'dectype 2 :',psb_cd_get_dectype(desc_a),&
&psb_desc_asb_,psb_desc_upd_ &psb_desc_bld_,psb_desc_asb_,psb_desc_upd_
endif endif
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)

@ -184,6 +184,9 @@ subroutine psb_cdrep(m, ictxt, desc_a, info)
call psb_errpush(info,name,i_err=int_err) call psb_errpush(info,name,i_err=int_err)
goto 9999 goto 9999
endif endif
! If the index space is replicated there's no point in having
! the AVL tree structure....
desc_a%matrix_data(psb_desc_size_) = psb_desc_normal_
desc_a%matrix_data(psb_m_) = m desc_a%matrix_data(psb_m_) = m

@ -156,86 +156,168 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info)
l_tmp_halo = novr*(3*Size(desc_a%halo_index)) l_tmp_halo = novr*(3*Size(desc_a%halo_index))
if (psb_is_large_desc(desc_a)) then if (psb_is_large_desc(desc_a)) then
desc_ov%matrix_data(psb_dec_type_) = psb_desc_large_bld_ desc_ov%matrix_data(psb_desc_size_) = psb_desc_large_
else else
desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_ desc_ov%matrix_data(psb_desc_size_) = psb_desc_normal_
end if end if
call psb_cd_set_bld(desc_ov,info)
!!$ desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_
If(debug) then If(debug) then
Write(0,*)'Start cdovrbld',me,lworks,lworkr Write(0,*)'Start cdovrbld',me,lworks,lworkr
call psb_barrier(ictxt) call psb_barrier(ictxt)
endif endif
if (.false.) then
!
! The real work goes on in here....
!
Call psb_cdovrbld(novr,desc_ov,desc_a,a,&
& l_tmp_halo,l_tmp_ovr_idx,lworks,lworkr,info)
if (info /= 0) then
info=4010
ch_err='psb_cdovrbld'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
If(debug) then Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info)
Write(0,*)'Done cdovrbld',me,lworks,lworkr if (info /= 0) then
call psb_barrier(ictxt) call psb_errpush(4010,name,a_err='Allocate')
endif goto 9999
end if
else Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),&
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info) call psb_sp_all(blk,max(lworks,lworkr),info)
if (info /= 0) then if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate') info=4010
goto 9999 ch_err='psb_sp_all'
end if call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& blk%fida='COO'
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
& halo(size(desc_a%halo_index)),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
Do j=0,n_elem_recv-1
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
tmp_ovr_idx(counter_o)=proc
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
end Do
counter=counter+n_elem_recv+n_elem_send+2
end Do
call psb_sp_all(blk,max(lworks,lworkr),info)
if (info /= 0) then
info=4010
ch_err='psb_sp_all'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
blk%fida='COO'
Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
& halo(size(desc_a%halo_index)),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
!
! A picture is in order to understand what goes on here.
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr
!
! At this point, halo contains a valid halo corresponding to the
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
!
bsdindx(:) = 0
sdsz(:) = 0
brvindx(:) = 0
rvsz(:) = 0
idxr = 0
idxs = 0
counter = 1
counter_t = 1
Do While (halo(counter) /= -1)
tot_elem=0
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999
end If
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv
!
!
! The format of the halo vector exists in two forms: 1. Temporary
! 2. Assembled. In this loop we are using the (assembled) halo_in and
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
!
! add recv elements in halo_index into ovrlap_index
!
Do j=0,n_elem_recv-1 Do j=0,n_elem_recv-1
If((counter+psb_elem_recv_+j)>Size(halo)) then
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) idx = halo(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then If(idx > Size(desc_ov%loc_to_glob)) then
info=-3 info=-3
call psb_errpush(info,name) call psb_errpush(info,name)
@ -256,442 +338,340 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info)
tmp_ovr_idx(counter_o+2)=gidx tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1 tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3 counter_o=counter_o+3
end Do if (.not.psb_is_large_desc(desc_ov)) then
counter=counter+n_elem_recv+n_elem_send+2 call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
end Do if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
tmp_halo(counter_h)=proc
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
! counter_h=counter_h+3
! A picture is in order to understand what goes on here. end if
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
! !
! At this point, halo contains a valid halo corresponding to the ! add send elements in halo_index into ovrlap_index
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
! !
bsdindx(:) = 0 Do j=0,n_elem_send-1
sdsz(:) = 0
brvindx(:) = 0 idx = halo(counter+psb_elem_send_+j)
rvsz(:) = 0 gidx = desc_ov%loc_to_glob(idx)
idxr = 0 if (idx > psb_cd_get_local_rows(Desc_a)) &
idxs = 0 & write(0,*) me,i_ovr,'Out of local rows ',&
counter = 1 & idx,psb_cd_get_local_rows(Desc_a)
counter_t = 1
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
Do While (halo(counter) /= -1) info=4010
tot_elem=0 call psb_errpush(info,name,a_err='psb_check_size')
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999 goto 9999
end If end if
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv tmp_ovr_idx(counter_o)=proc
! tmp_ovr_idx(counter_o+1)=1
! tmp_ovr_idx(counter_o+2)=gidx
! The format of the halo vector exists in two forms: 1. Temporary tmp_ovr_idx(counter_o+3)=-1
! 2. Assembled. In this loop we are using the (assembled) halo_in and counter_o=counter_o+3
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
! !
! add recv elements in halo_index into ovrlap_index ! Prepare to exchange the halo rows with the other proc.
! !
Do j=0,n_elem_recv-1 If (i_ovr < (novr)) Then
If((counter+psb_elem_recv_+j)>Size(halo)) then n_elem = psb_sp_get_nnz_row(idx,a)
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = halo(counter+psb_elem_recv_+j) call psb_check_size((idxs+tot_elem+n_elem),works,info)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') call psb_errpush(info,name,a_err='psb_check_size')
goto 9999 goto 9999
end if end if
tmp_ovr_idx(counter_o)=proc If((n_elem) > size(blk%ia2)) Then
tmp_ovr_idx(counter_o+1)=1 isz = max((3*size(blk%ia2))/2,(n_elem))
tmp_ovr_idx(counter_o+2)=gidx if (debug) write(0,*) me,'Realloc blk',isz
tmp_ovr_idx(counter_o+3)=-1 call psb_sp_reall(blk,isz,info)
counter_o=counter_o+3
if (.not.psb_is_large_desc(desc_ov)) then
call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
End If
tmp_halo(counter_h)=proc call psb_sp_getblk(idx,a,blk,info)
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
counter_h=counter_h+3
end if
Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
!
! add send elements in halo_index into ovrlap_index
!
Do j=0,n_elem_send-1
idx = halo(counter+psb_elem_send_+j)
gidx = desc_ov%loc_to_glob(idx)
if (idx > psb_cd_get_local_rows(Desc_a)) &
& write(0,*) me,i_ovr,'Out of local rows ',&
& idx,psb_cd_get_local_rows(Desc_a)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_getblk'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
!!$ write(0,*) me,'Iteration: ',j,i_ovr
Do jj=1,n_elem
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj))
End Do
tot_elem=tot_elem+n_elem
End If
tmp_ovr_idx(counter_o)=proc Enddo
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
!
! Prepare to exchange the halo rows with the other proc.
!
If (i_ovr < (novr)) Then
n_elem = psb_sp_get_nnz_row(idx,a)
call psb_check_size((idxs+tot_elem+n_elem),works,info)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
If((n_elem) > size(blk%ia2)) Then if (i_ovr < novr) then
isz = max((3*size(blk%ia2))/2,(n_elem)) if (tot_elem > 1) then
if (debug) write(0,*) me,'Realloc blk',isz call imsr(tot_elem,works(idxs+1))
call psb_sp_reall(blk,isz,info) lx = works(idxs+1)
if (info /= 0) then i = 1
info=4010 j = 1
ch_err='psb_sp_reall' do
call psb_errpush(info,name,a_err=ch_err) j = j + 1
goto 9999 if (j > tot_elem) exit
end if if (works(idxs+j) /= lx) then
End If i = i + 1
works(idxs+i) = works(idxs+j)
call psb_sp_getblk(idx,a,blk,info) lx = works(idxs+i)
if (info /= 0) then
info=4010
ch_err='psb_sp_getblk'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if end if
!!$ write(0,*) me,'Iteration: ',j,i_ovr end do
Do jj=1,n_elem tot_elem = i
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) endif
End Do if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10)
tot_elem=tot_elem+n_elem sdsz(proc+1) = tot_elem
End If idxs = idxs + tot_elem
end if
Enddo counter = counter+n_elem_send+3
if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
Enddo
if (i_ovr < novr) then if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
if (tot_elem > 1) then
call imsr(tot_elem,works(idxs+1)) if (i_ovr < novr) then
lx = works(idxs+1) !
i = 1 ! Exchange data requests with everybody else: so far we have
j = 1 ! accumulated RECV requests, we have an all-to-all to build
do ! matchings SENDs.
j = j + 1 !
if (j > tot_elem) exit call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (works(idxs+j) /= lx) then if (info /= 0) then
i = i + 1 info=4010
works(idxs+i) = works(idxs+j) ch_err='mpi_alltoall'
lx = works(idxs+i) call psb_errpush(info,name,a_err=ch_err)
end if goto 9999
end do end if
tot_elem = i idxs = 0
endif idxr = 0
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) counter = 1
sdsz(proc+1) = tot_elem Do
idxs = idxs + tot_elem proc=halo(counter)
end if if (proc == -1) exit
n_elem_recv = halo(counter+psb_n_elem_recv_)
counter = counter+n_elem_recv
n_elem_send = halo(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_elem_send+3 counter = counter+n_elem_send+3
if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
Enddo Enddo
if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
iszr=sum(rvsz)
if (i_ovr < novr) then if (max(iszr,1) > lworkr) then
! call psb_realloc(max(iszr,1),workr,info)
! Exchange data requests with everybody else: so far we have
! accumulated RECV requests, we have an all-to-all to build
! matchings SENDs.
!
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoall' ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err) call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
idxs = 0 lworkr=max(iszr,1)
idxr = 0 end if
counter = 1
Do call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,&
proc=halo(counter) & workr,rvsz,brvindx,mpi_integer,icomm,info)
if (proc == -1) exit if (info /= 0) then
n_elem_recv = halo(counter+psb_n_elem_recv_) info=4010
counter = counter+n_elem_recv ch_err='mpi_alltoallv'
n_elem_send = halo(counter+psb_n_elem_send_) call psb_errpush(info,name,a_err=ch_err)
goto 9999
bsdindx(proc+1) = idxs end if
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_elem_send+3
Enddo
iszr=sum(rvsz)
if (max(iszr,1) > lworkr) then
call psb_realloc(max(iszr,1),workr,info)
if (info /= 0) then
info=4010
ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
lworkr=max(iszr,1)
end if
call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& if (debug) write(0,*) 'ISZR :',iszr
& workr,rvsz,brvindx,mpi_integer,icomm,info)
if (psb_is_large_desc(desc_a)) then
call psb_check_size(iszr,maskr,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoallv' call psb_errpush(info,name,a_err='psb_check_size')
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info)
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
if (debug) write(0,*) 'ISZR :',iszr desc_ov%glob_to_loc(idx)=n_col
desc_ov%loc_to_glob(n_col)=idx
if (psb_is_large_desc(desc_a)) then call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
call psb_check_size(iszr,maskr,info) if (info /= 0) then
if (info /= 0) then info=4010
info=4010 call psb_errpush(info,name,a_err='psb_check_size')
call psb_errpush(info,name,a_err='psb_check_size') goto 9999
goto 9999
end if
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info)
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
desc_ov%glob_to_loc(idx)=n_col
desc_ov%loc_to_glob(n_col)=idx
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
If (i_ovr < (novr)) Then end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
t_halo_in(counter_t)=-1 If (i_ovr < (novr)) Then
if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10) t_halo_in(counter_t)=-1
if (debug) write(0,*) me,'Calling Crea_Halo'
call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,& if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10)
& nxch,nsnd,nrcv,info) if (debug) write(0,*) me,'Calling Crea_Halo'
if (debug) then call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,&
write(0,*) me,'Done Crea_Index' & nxch,nsnd,nrcv,info)
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
End Do if (debug) then
write(0,*) me,'Done Crea_Index'
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a) End Do
desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1 desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a)
tmp_ovr_idx(counter_o:)=-1 desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1
tmp_ovr_idx(counter_o:)=-1
!
! At this point we have gathered all the indices in the halo at
! N levels of overlap. Just call cnv_dsc. This is
! the same routine as gets called inside CDASB.
!
if (debug) then !
write(0,*) 'psb_cdovrbld: converting indexes' ! At this point we have gathered all the indices in the halo at
call psb_barrier(ictxt) ! N levels of overlap. Just call cnv_dsc. This is
end if ! the same routine as gets called inside CDASB.
!.... convert comunication stuctures.... !
! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info) if (debug) then
if (info /= 0) then write(0,*) 'psb_cdovrbld: converting indexes'
ch_err='sp_free' call psb_barrier(ictxt)
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/)) end if
goto 9999 !.... convert comunication stuctures....
end if ! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info)
if (info /= 0) then
ch_err='sp_free'
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/))
goto 9999
end if end if
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)

@ -100,11 +100,11 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
If(debug) Write(0,*)'IN CDOVR1',novr ,m,nnzero,n_col If(debug) Write(0,*)'IN CDOVR1',novr ,m,nnzero,n_col
if (novr<0) then if (novr<0) then
info=10 info=10
int_err(1)=1 int_err(1)=1
int_err(2)=novr int_err(2)=novr
call psb_errpush(info,name,i_err=int_err) call psb_errpush(info,name,i_err=int_err)
goto 9999 goto 9999
endif endif
if (debug) write(0,*) 'Calling desccpy' if (debug) write(0,*) 'Calling desccpy'
@ -137,9 +137,9 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
! !
nztot = psb_sp_get_nnzeros(a) nztot = psb_sp_get_nnzeros(a)
if (nztot>0) then if (nztot>0) then
lovr = ((nztot+m-1)/m)*nhalo*novr lovr = ((nztot+m-1)/m)*nhalo*novr
lworks = ((nztot+m-1)/m)*nhalo lworks = ((nztot+m-1)/m)*nhalo
lworkr = ((nztot+m-1)/m)*nhalo lworkr = ((nztot+m-1)/m)*nhalo
else else
info=-1 info=-1
call psb_errpush(info,name) call psb_errpush(info,name)
@ -155,86 +155,168 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
l_tmp_halo = novr*(3*Size(desc_a%halo_index)) l_tmp_halo = novr*(3*Size(desc_a%halo_index))
if (psb_is_large_desc(desc_a)) then if (psb_is_large_desc(desc_a)) then
desc_ov%matrix_data(psb_dec_type_) = psb_desc_large_bld_ desc_ov%matrix_data(psb_desc_size_) = psb_desc_large_
else else
desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_ desc_ov%matrix_data(psb_desc_size_) = psb_desc_normal_
end if end if
call psb_cd_set_bld(desc_ov,info)
!!$ desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_
If(debug) then If(debug) then
Write(0,*)'Start cdovrbld',me,lworks,lworkr Write(0,*)'Start cdovrbld',me,lworks,lworkr
call psb_barrier(ictxt) call psb_barrier(ictxt)
endif endif
if (.false.) then
!
! The real work goes on in here....
!
Call psb_cdovrbld(novr,desc_ov,desc_a,a,&
& l_tmp_halo,l_tmp_ovr_idx,lworks,lworkr,info)
if (info /= 0) then
info=4010
ch_err='psb_cdovrbld'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
If(debug) then Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info)
Write(0,*)'Done cdovrbld',me,lworks,lworkr if (info /= 0) then
call psb_barrier(ictxt) call psb_errpush(4010,name,a_err='Allocate')
endif goto 9999
end if
else Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),&
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info) call psb_sp_all(blk,max(lworks,lworkr),info)
if (info /= 0) then if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate') info=4010
goto 9999 ch_err='psb_sp_all'
end if call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& blk%fida='COO'
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
& halo(size(desc_a%halo_index)),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
Do j=0,n_elem_recv-1
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
tmp_ovr_idx(counter_o)=proc
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
end Do
counter=counter+n_elem_recv+n_elem_send+2
end Do
call psb_sp_all(blk,max(lworks,lworkr),info)
if (info /= 0) then
info=4010
ch_err='psb_sp_all'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
blk%fida='COO'
Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
& halo(size(desc_a%halo_index)),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
!
! A picture is in order to understand what goes on here.
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr
!
! At this point, halo contains a valid halo corresponding to the
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
!
bsdindx(:) = 0
sdsz(:) = 0
brvindx(:) = 0
rvsz(:) = 0
idxr = 0
idxs = 0
counter = 1
counter_t = 1
Do While (halo(counter) /= -1)
tot_elem=0
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999
end If
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv
!
!
! The format of the halo vector exists in two forms: 1. Temporary
! 2. Assembled. In this loop we are using the (assembled) halo_in and
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
!
! add recv elements in halo_index into ovrlap_index
!
Do j=0,n_elem_recv-1 Do j=0,n_elem_recv-1
If((counter+psb_elem_recv_+j)>Size(halo)) then
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) idx = halo(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then If(idx > Size(desc_ov%loc_to_glob)) then
info=-3 info=-3
call psb_errpush(info,name) call psb_errpush(info,name)
@ -255,442 +337,340 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
tmp_ovr_idx(counter_o+2)=gidx tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1 tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3 counter_o=counter_o+3
end Do if (.not.psb_is_large_desc(desc_ov)) then
counter=counter+n_elem_recv+n_elem_send+2 call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
end Do if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
tmp_halo(counter_h)=proc
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
! counter_h=counter_h+3
! A picture is in order to understand what goes on here. end if
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
! !
! At this point, halo contains a valid halo corresponding to the ! add send elements in halo_index into ovrlap_index
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
! !
bsdindx(:) = 0 Do j=0,n_elem_send-1
sdsz(:) = 0
brvindx(:) = 0 idx = halo(counter+psb_elem_send_+j)
rvsz(:) = 0 gidx = desc_ov%loc_to_glob(idx)
idxr = 0 if (idx > psb_cd_get_local_rows(Desc_a)) &
idxs = 0 & write(0,*) me,i_ovr,'Out of local rows ',&
counter = 1 & idx,psb_cd_get_local_rows(Desc_a)
counter_t = 1
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
Do While (halo(counter) /= -1) info=4010
tot_elem=0 call psb_errpush(info,name,a_err='psb_check_size')
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999 goto 9999
end If end if
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv tmp_ovr_idx(counter_o)=proc
! tmp_ovr_idx(counter_o+1)=1
! tmp_ovr_idx(counter_o+2)=gidx
! The format of the halo vector exists in two forms: 1. Temporary tmp_ovr_idx(counter_o+3)=-1
! 2. Assembled. In this loop we are using the (assembled) halo_in and counter_o=counter_o+3
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
! !
! add recv elements in halo_index into ovrlap_index ! Prepare to exchange the halo rows with the other proc.
! !
Do j=0,n_elem_recv-1 If (i_ovr < (novr)) Then
If((counter+psb_elem_recv_+j)>Size(halo)) then n_elem = psb_sp_get_nnz_row(idx,a)
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = halo(counter+psb_elem_recv_+j) call psb_check_size((idxs+tot_elem+n_elem),works,info)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') call psb_errpush(info,name,a_err='psb_check_size')
goto 9999 goto 9999
end if end if
tmp_ovr_idx(counter_o)=proc If((n_elem) > size(blk%ia2)) Then
tmp_ovr_idx(counter_o+1)=1 isz = max((3*size(blk%ia2))/2,(n_elem))
tmp_ovr_idx(counter_o+2)=gidx if (debug) write(0,*) me,'Realloc blk',isz
tmp_ovr_idx(counter_o+3)=-1 call psb_sp_reall(blk,isz,info)
counter_o=counter_o+3
if (.not.psb_is_large_desc(desc_ov)) then
call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
End If
tmp_halo(counter_h)=proc call psb_sp_getblk(idx,a,blk,info)
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
counter_h=counter_h+3
end if
Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
!
! add send elements in halo_index into ovrlap_index
!
Do j=0,n_elem_send-1
idx = halo(counter+psb_elem_send_+j)
gidx = desc_ov%loc_to_glob(idx)
if (idx > psb_cd_get_local_rows(Desc_a)) &
& write(0,*) me,i_ovr,'Out of local rows ',&
& idx,psb_cd_get_local_rows(Desc_a)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_getblk'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
!!$ write(0,*) me,'Iteration: ',j,i_ovr
Do jj=1,n_elem
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj))
End Do
tot_elem=tot_elem+n_elem
End If
tmp_ovr_idx(counter_o)=proc Enddo
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
!
! Prepare to exchange the halo rows with the other proc.
!
If (i_ovr < (novr)) Then
n_elem = psb_sp_get_nnz_row(idx,a)
call psb_check_size((idxs+tot_elem+n_elem),works,info) if (i_ovr < novr) then
if (info /= 0) then if (tot_elem > 1) then
info=4010 call imsr(tot_elem,works(idxs+1))
call psb_errpush(info,name,a_err='psb_check_size') lx = works(idxs+1)
goto 9999 i = 1
j = 1
do
j = j + 1
if (j > tot_elem) exit
if (works(idxs+j) /= lx) then
i = i + 1
works(idxs+i) = works(idxs+j)
lx = works(idxs+i)
end if end if
end do
If((n_elem) > size(blk%ia2)) Then tot_elem = i
isz = max((3*size(blk%ia2))/2,(n_elem)) endif
if (debug) write(0,*) me,'Realloc blk',isz if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10)
call psb_sp_reall(blk,isz,info) sdsz(proc+1) = tot_elem
if (info /= 0) then idxs = idxs + tot_elem
info=4010 end if
ch_err='psb_sp_reall' counter = counter+n_elem_send+3
call psb_errpush(info,name,a_err=ch_err) if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
goto 9999 Enddo
end if if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
End If
if (i_ovr < novr) then
call psb_sp_getblk(idx,a,blk,info) !
if (info /= 0) then ! Exchange data requests with everybody else: so far we have
info=4010 ! accumulated RECV requests, we have an all-to-all to build
ch_err='psb_sp_getblk' ! matchings SENDs.
call psb_errpush(info,name,a_err=ch_err) !
goto 9999 call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
end if if (info /= 0) then
!!$ write(0,*) me,'Iteration: ',j,i_ovr info=4010
Do jj=1,n_elem ch_err='mpi_alltoall'
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) call psb_errpush(info,name,a_err=ch_err)
End Do goto 9999
tot_elem=tot_elem+n_elem end if
End If idxs = 0
idxr = 0
Enddo counter = 1
Do
proc=halo(counter)
if (i_ovr < novr) then if (proc == -1) exit
if (tot_elem > 1) then n_elem_recv = halo(counter+psb_n_elem_recv_)
call imsr(tot_elem,works(idxs+1)) counter = counter+n_elem_recv
lx = works(idxs+1) n_elem_send = halo(counter+psb_n_elem_send_)
i = 1
j = 1 bsdindx(proc+1) = idxs
do idxs = idxs + sdsz(proc+1)
j = j + 1 brvindx(proc+1) = idxr
if (j > tot_elem) exit idxr = idxr + rvsz(proc+1)
if (works(idxs+j) /= lx) then
i = i + 1
works(idxs+i) = works(idxs+j)
lx = works(idxs+i)
end if
end do
tot_elem = i
endif
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10)
sdsz(proc+1) = tot_elem
idxs = idxs + tot_elem
end if
counter = counter+n_elem_send+3 counter = counter+n_elem_send+3
if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
Enddo Enddo
if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
iszr=sum(rvsz)
if (i_ovr < novr) then if (max(iszr,1) > lworkr) then
! call psb_realloc(max(iszr,1),workr,info)
! Exchange data requests with everybody else: so far we have
! accumulated RECV requests, we have an all-to-all to build
! matchings SENDs.
!
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoall' ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err) call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
idxs = 0 lworkr=max(iszr,1)
idxr = 0 end if
counter = 1
Do call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,&
proc=halo(counter) & workr,rvsz,brvindx,mpi_integer,icomm,info)
if (proc == -1) exit if (info /= 0) then
n_elem_recv = halo(counter+psb_n_elem_recv_) info=4010
counter = counter+n_elem_recv ch_err='mpi_alltoallv'
n_elem_send = halo(counter+psb_n_elem_send_) call psb_errpush(info,name,a_err=ch_err)
goto 9999
bsdindx(proc+1) = idxs end if
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_elem_send+3
Enddo
iszr=sum(rvsz)
if (max(iszr,1) > lworkr) then
call psb_realloc(max(iszr,1),workr,info)
if (info /= 0) then
info=4010
ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
lworkr=max(iszr,1)
end if
call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& if (debug) write(0,*) 'ISZR :',iszr
& workr,rvsz,brvindx,mpi_integer,icomm,info)
if (psb_is_large_desc(desc_a)) then
call psb_check_size(iszr,maskr,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoallv' call psb_errpush(info,name,a_err='psb_check_size')
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info)
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
if (debug) write(0,*) 'ISZR :',iszr t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
if (psb_is_large_desc(desc_a)) then desc_ov%glob_to_loc(idx)=n_col
call psb_check_size(iszr,maskr,info) desc_ov%loc_to_glob(n_col)=idx
if (info /= 0) then
info=4010 call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
call psb_errpush(info,name,a_err='psb_check_size') if (info /= 0) then
goto 9999 info=4010
end if call psb_errpush(info,name,a_err='psb_check_size')
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info) goto 9999
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
desc_ov%glob_to_loc(idx)=n_col
desc_ov%loc_to_glob(n_col)=idx
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
If (i_ovr < (novr)) Then end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
t_halo_in(counter_t)=-1 If (i_ovr < (novr)) Then
if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10) t_halo_in(counter_t)=-1
if (debug) write(0,*) me,'Calling Crea_Halo'
call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,& if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10)
& nxch,nsnd,nrcv,info) if (debug) write(0,*) me,'Calling Crea_Halo'
if (debug) then call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,&
write(0,*) me,'Done Crea_Index' & nxch,nsnd,nrcv,info)
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
End Do if (debug) then
write(0,*) me,'Done Crea_Index'
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a) End Do
desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1 desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a)
tmp_ovr_idx(counter_o:)=-1 desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1
tmp_ovr_idx(counter_o:)=-1
!
! At this point we have gathered all the indices in the halo at
! N levels of overlap. Just call cnv_dsc. This is
! the same routine as gets called inside CDASB.
!
if (debug) then !
write(0,*) 'psb_cdovrbld: converting indexes' ! At this point we have gathered all the indices in the halo at
call psb_barrier(ictxt) ! N levels of overlap. Just call cnv_dsc. This is
end if ! the same routine as gets called inside CDASB.
!.... convert comunication stuctures.... !
! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info) if (debug) then
if (info /= 0) then write(0,*) 'psb_cdovrbld: converting indexes'
ch_err='sp_free' call psb_barrier(ictxt)
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/)) end if
goto 9999 !.... convert comunication stuctures....
end if ! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info)
if (info /= 0) then
ch_err='sp_free'
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/))
goto 9999
end if end if
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)

Loading…
Cancel
Save