Fixed borderline case for PSB_CDALL( VL=())

Code was hanging if VL was empty on some processes.
pizdaint-runs
Salvatore Filippone 5 years ago
parent a59c79a2f8
commit d342e2d59e

@ -107,7 +107,7 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info)
integer(psb_ipk_) :: ictxt,np,me, nresp integer(psb_ipk_) :: ictxt,np,me, nresp
integer(psb_ipk_), parameter :: nt=4 integer(psb_ipk_), parameter :: nt=4
integer(psb_ipk_) :: tmpv(4) integer(psb_ipk_) :: tmpv(4)
logical, parameter :: do_timings=.false., trace=.false. logical, parameter :: do_timings=.false., trace=.false., debugsz=.false.
integer(psb_ipk_), save :: idx_sweep0=-1, idx_loop_a2a=-1, idx_loop_neigh=-1 integer(psb_ipk_), save :: idx_sweep0=-1, idx_loop_a2a=-1, idx_loop_neigh=-1
real(psb_dpk_) :: t0, t1, t2, t3, t4 real(psb_dpk_) :: t0, t1, t2, t3, t4
character(len=20) :: name character(len=20) :: name
@ -180,6 +180,7 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info)
tmpv(3) = n_row tmpv(3) = n_row
tmpv(4) = psb_cd_get_maxspace() tmpv(4) = psb_cd_get_maxspace()
call psb_max(ictxt,tmpv) call psb_max(ictxt,tmpv)
nreqst_max = tmpv(2)
locr_max = tmpv(3) locr_max = tmpv(3)
maxspace = nt*locr_max maxspace = nt*locr_max
if (tmpv(4) > 0) maxspace = min(maxspace,tmpv(4)) if (tmpv(4) > 0) maxspace = min(maxspace,tmpv(4))
@ -191,6 +192,7 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info)
! Do a preliminary run on the user-defined adjacency lists ! Do a preliminary run on the user-defined adjacency lists
! !
if (trace.and.(me == 0)) write(0,*) ' Initial sweep on user-defined topology' if (trace.and.(me == 0)) write(0,*) ' Initial sweep on user-defined topology'
if (debugsz) write(0,*) me,' Initial sweep on user-defined topology',nreqst
nsampl_in = min(nreqst,max(1,(maxspace+max(1,nadj)-1))/(max(1,nadj))) nsampl_in = min(nreqst,max(1,(maxspace+max(1,nadj)-1))/(max(1,nadj)))
call psi_adj_fnd_sweep(idx,iprc,ladj,idxmap,nsampl_in,n_answers) call psi_adj_fnd_sweep(idx,iprc,ladj,idxmap,nsampl_in,n_answers)
call idxmap%xtnd_p_adjcncy(ladj) call idxmap%xtnd_p_adjcncy(ladj)
@ -198,11 +200,13 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info)
nreqst_max = nreqst nreqst_max = nreqst
call psb_max(ictxt,nreqst_max) call psb_max(ictxt,nreqst_max)
if (trace.and.(me == 0)) write(0,*) ' After initial sweep:',nreqst_max if (trace.and.(me == 0)) write(0,*) ' After initial sweep:',nreqst_max
if (debugsz) write(0,*) me,' After sweep on user-defined topology',nreqst_max
end if end if
if (do_timings) call psb_toc(idx_sweep0) if (do_timings) call psb_toc(idx_sweep0)
fnd_owner_loop: do while (nreqst_max>0) fnd_owner_loop: do while (nreqst_max>0)
if (do_timings) call psb_tic(idx_loop_a2a) if (do_timings) call psb_tic(idx_loop_a2a)
if (debugsz) write(0,*) me,' fnd_owner_loop',nreqst_max
! !
! The basic idea of this loop is to alternate between ! The basic idea of this loop is to alternate between
! searching through all processes and searching ! searching through all processes and searching
@ -221,12 +225,13 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info)
ipnt = 1 ipnt = 1
call psi_get_sample(ipnt, idx,iprc,tidx,tsmpl,nsampl_in,nsampl_out, pad=.true.) call psi_get_sample(ipnt, idx,iprc,tidx,tsmpl,nsampl_in,nsampl_out, pad=.true.)
nsampl_in = min(nsampl_out,nsampl_in) nsampl_in = min(nsampl_out,nsampl_in)
!!$ write(0,*) me,' From first sampling ',nsampl_in if (debugsz) write(0,*) me,' From first sampling ',nsampl_in
! !
! 2. Do a search on all processes; this is supposed to find ! 2. Do a search on all processes; this is supposed to find
! the owning process for all inputs; ! the owning process for all inputs;
! !
call psi_a2a_fnd_owner(tidx(1:nsampl_in),tprc,idxmap,info, samesize=.true.) call psi_a2a_fnd_owner(tidx(1:nsampl_in),tprc,idxmap,info, samesize=.true.)
if (debugsz) write(0,*) me,' From a2a_fnd_owner ',info
! !
! We might have padded when looking for owners, so the actual samples ! We might have padded when looking for owners, so the actual samples
! could be less than they appear. Should be explained better. ! could be less than they appear. Should be explained better.

@ -68,6 +68,7 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
integer(psb_lpk_), allocatable :: vl(:), ix(:), l_temp_ovrlap(:) integer(psb_lpk_), allocatable :: vl(:), ix(:), l_temp_ovrlap(:)
integer(psb_ipk_) :: debug_level, debug_unit integer(psb_ipk_) :: debug_level, debug_unit
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
logical, parameter :: debug_size=.false.
logical :: do_timings=.false. logical :: do_timings=.false.
logical :: check_, islarge, usehash_ logical :: check_, islarge, usehash_
character(len=20) :: name character(len=20) :: name
@ -146,12 +147,14 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
islarge = psb_cd_is_large_size(m) islarge = psb_cd_is_large_size(m)
allocate(vl(loc_row),ix(loc_row),stat=info) allocate(vl(max(loc_row,ione)),ix(max(loc_row,ione)),stat=info)
if (info /= psb_success_) then if (info /= psb_success_) then
info=psb_err_alloc_dealloc_ info=psb_err_alloc_dealloc_
call psb_errpush(info,name,l_err=l_err) call psb_errpush(info,name,l_err=l_err)
goto 9999 goto 9999
end if end if
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': sizes',loc_row,m,nrt,check_
! !
! Checks for valid input: ! Checks for valid input:
@ -162,6 +165,9 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
! !
if (check_.or.(.not.islarge)) then if (check_.or.(.not.islarge)) then
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': Going for global checks'
allocate(tmpgidx(m,2),stat=info) allocate(tmpgidx(m,2),stat=info)
if (info /= psb_success_) then if (info /= psb_success_) then
info=psb_err_alloc_dealloc_ info=psb_err_alloc_dealloc_
@ -226,6 +232,9 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
write(psb_err_unit,*) trim(name),' : in the global sizes!',m,nrt write(psb_err_unit,*) trim(name),' : in the global sizes!',m,nrt
end if end if
end if end if
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': After global checks '
if (do_timings) then if (do_timings) then
call psb_barrier(ictxt) call psb_barrier(ictxt)
t1 = psb_wtime() t1 = psb_wtime()
@ -252,7 +261,7 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
end do end do
end if end if
call psb_msort(vl,ix,flag=psb_sort_keep_idx_) call psb_msort(vl,ix,flag=psb_sort_keep_idx_)
nlu = 1 nlu = min(1,loc_row)
do i=2,loc_row do i=2,loc_row
if (vl(i) /= vl(nlu)) then if (vl(i) /= vl(nlu)) then
nlu = nlu + 1 nlu = nlu + 1
@ -262,6 +271,9 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
end do end do
call psb_msort(ix(1:nlu),vl(1:nlu),flag=psb_sort_keep_idx_) call psb_msort(ix(1:nlu),vl(1:nlu),flag=psb_sort_keep_idx_)
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': After sort ',nlu
call psb_nullify_desc(desc) call psb_nullify_desc(desc)
if (do_timings) then if (do_timings) then
call psb_barrier(ictxt) call psb_barrier(ictxt)
@ -275,6 +287,9 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
! are safe. ! are safe.
! !
if (novrl > 0) then if (novrl > 0) then
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': Check overlap '
if (debug_level >= psb_debug_ext_) & if (debug_level >= psb_debug_ext_) &
& write(debug_unit,*) me,' ',trim(name),': code for NOVRL>0',novrl,npr_ov & write(debug_unit,*) me,' ',trim(name),': code for NOVRL>0',novrl,npr_ov
@ -318,6 +333,9 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
call psb_msort(ov_idx(:,1),ix=ov_idx(:,2),flag=psb_sort_keep_idx_) call psb_msort(ov_idx(:,1),ix=ov_idx(:,2),flag=psb_sort_keep_idx_)
end if end if
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': Done overlap '
! allocate work vector ! allocate work vector
allocate(l_temp_ovrlap(max(1,2*loc_row)),desc%lprm(1),& allocate(l_temp_ovrlap(max(1,2*loc_row)),desc%lprm(1),&
@ -373,6 +391,8 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
call psb_barrier(ictxt) call psb_barrier(ictxt)
t3 = psb_wtime() t3 = psb_wtime()
end if end if
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': Allocate indxmap'
if (np == 1) then if (np == 1) then
allocate(psb_repl_map :: desc%indxmap, stat=info) allocate(psb_repl_map :: desc%indxmap, stat=info)
@ -391,6 +411,10 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
call aa%init(ictxt,vl(1:nlu),info) call aa%init(ictxt,vl(1:nlu),info)
end select end select
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': Done init indxmap'
if (do_timings) then if (do_timings) then
call psb_barrier(ictxt) call psb_barrier(ictxt)
t4 = psb_wtime() t4 = psb_wtime()
@ -418,6 +442,8 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
end if end if
end block end block
if (info == psb_success_) call psi_bld_tmpovrl(temp_ovrlap,desc,info) if (info == psb_success_) call psi_bld_tmpovrl(temp_ovrlap,desc,info)
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': Done bld_tmpovrl'
if (info == psb_success_) deallocate(temp_ovrlap,vl,ix,stat=info) if (info == psb_success_) deallocate(temp_ovrlap,vl,ix,stat=info)
if ((info == psb_success_).and.(allocated(tmpgidx)))& if ((info == psb_success_).and.(allocated(tmpgidx)))&
@ -453,6 +479,8 @@ subroutine psb_cd_inloc(v, ictxt, desc, info, globalcheck,idx,usehash)
write(0,*) ' Phase 5 : ', t5 write(0,*) ' Phase 5 : ', t5
end if end if
end if end if
if (debug_size) &
& write(debug_unit,*) me,' ',trim(name),': Done cd_inloc'
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return

Loading…
Cancel
Save