Fixed sphalo to use getrow, not getblk, to avoid problems with

subsequente all-to-all.
Fixed getblk to use fixcoo by default. 
Fixed spclip to use getrow, and save on memory space. 
Should we change fixcoo name?
psblas3-type-indexed
Salvatore Filippone 18 years ago
parent d4d12dbb36
commit c2377670eb

@ -402,25 +402,27 @@ module psb_serial_mod
end interface
interface psb_sp_getblk
subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw)
subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw,srt)
use psb_spmat_type
type(psb_dspmat_type), intent(in) :: a
integer, intent(in) :: irw
type(psb_dspmat_type), intent(inout) :: b
integer, intent(out) :: info
logical, intent(in), optional :: append
integer, intent(in), target, optional :: iren(:)
integer, intent(in), optional :: lrw
integer, intent(out) :: info
logical, intent(in), optional :: srt
end subroutine psb_dspgtblk
subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw)
subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw,srt)
use psb_spmat_type
type(psb_zspmat_type), intent(in) :: a
integer, intent(in) :: irw
type(psb_zspmat_type), intent(inout) :: b
integer, intent(out) :: info
logical, intent(in), optional :: append
integer, intent(in), target, optional :: iren(:)
integer, intent(in), optional :: lrw
integer, intent(out) :: info
logical, intent(in), optional :: srt
end subroutine psb_zspgtblk
end interface

@ -48,12 +48,11 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
logical, intent(in), optional :: rscale,cscale
integer :: lrw_, ierr(5), err_act
type(psb_dspmat_type) :: tmp
character(len=20) :: name, ch_err
integer :: imin_,imax_,jmin_,jmax_
logical :: rscale_,cscale_
integer :: sizeb, nzb, mb, kb, ifst, ilst, nrt, nzt, i, j
integer, parameter :: irbk=40, inzr=16
integer, parameter :: irbk=40
name='psb_dsp_clip'
info = 0
@ -61,7 +60,6 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
call psb_set_erraction(0)
call psb_nullify_sp(b)
call psb_sp_all(tmp,inzr*irbk,info)
if (present(imin)) then
@ -116,16 +114,17 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
nrt = min(irbk,imax_-i+1)
ifst = i
ilst = ifst + nrt - 1
call psb_sp_getblk(ifst,a,tmp,info,lrw=ilst)
nzt = psb_sp_get_nnzeros(tmp)
do j=1, nzt
if ((jmin_ <= tmp%ia2(j)).and.(tmp%ia2(j) <= jmax_)) then
call psb_sp_getrow(ifst,a,nzt,b%ia1,b%ia2,b%aspk,info,&
& lrw=ilst, append=.true.,nzin=nzb)
do j=nzb+1,nzb+nzt
if ((jmin_ <= b%ia2(j)).and.(b%ia2(j) <= jmax_)) then
nzb = nzb + 1
b%aspk(nzb) = tmp%aspk(j)
b%ia1(nzb) = tmp%ia1(j)
b%ia2(nzb) = tmp%ia2(j)
b%aspk(nzb) = b%aspk(j)
b%ia1(nzb) = b%ia1(j)
b%ia2(nzb) = b%ia2(j)
end if
end do
end do
b%infoa(psb_nnz_) = nzb
@ -141,7 +140,6 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
end if
call psb_fixcoo(b,info)
call psb_sp_trim(b,info)
call psb_sp_free(tmp,info)
call psb_erractionrestore(err_act)
return

@ -38,7 +38,7 @@
!* appending to B). Output is always COO. Input might be anything, *
!* *
!*****************************************************************************
subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw)
subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw,srt)
! Output is always in COO format into B, irrespective of
! the input format
use psb_spmat_type
@ -54,8 +54,9 @@ subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw)
logical, intent(in), optional :: append
integer, intent(in), target, optional :: iren(:)
integer, intent(in), optional :: lrw
logical, intent(in), optional :: srt
logical :: append_
logical :: append_,srt_
integer :: i,j,k,ip,jp,nr,idx, nz,iret,nzb, nza, lrw_, irw_, err_act
character(len=20) :: name, ch_err
@ -79,6 +80,12 @@ subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw)
append_ = .false.
endif
if (present(srt)) then
srt_ = srt
else
srt_ = .true.
endif
if (append_) then
nzb = b%infoa(psb_nnz_)
@ -103,7 +110,7 @@ subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw)
b%infoa(psb_nnz_) = nzb+nz
b%m = b%m+lrw_-irw+1
b%k = max(b%k,a%k)
if (srt_) call psb_fixcoo(b,info)
!!$ call psb_erractionrestore(err_act)
return

@ -48,12 +48,11 @@ subroutine psb_zspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
logical, intent(in), optional :: rscale,cscale
integer :: lrw_, ierr(5), err_act
type(psb_zspmat_type) :: tmp
character(len=20) :: name, ch_err
integer :: imin_,imax_,jmin_,jmax_
logical :: rscale_,cscale_
integer :: sizeb, nzb, mb, kb, ifst, ilst, nrt, nzt, i, j
integer, parameter :: irbk=40, inzr=16
integer, parameter :: irbk=40
name='psb_zsp_clip'
info = 0
@ -61,7 +60,6 @@ subroutine psb_zspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
call psb_set_erraction(0)
call psb_nullify_sp(b)
call psb_sp_all(tmp,inzr*irbk,info)
if (present(imin)) then
@ -116,16 +114,17 @@ subroutine psb_zspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
nrt = min(irbk,imax_-i+1)
ifst = i
ilst = ifst + nrt - 1
call psb_sp_getblk(ifst,a,tmp,info,lrw=ilst)
nzt = psb_sp_get_nnzeros(tmp)
do j=1, nzt
if ((jmin_ <= tmp%ia2(j)).and.(tmp%ia2(j) <= jmax_)) then
call psb_sp_getrow(ifst,a,nzt,b%ia1,b%ia2,b%aspk,info,&
& lrw=ilst, append=.true.,nzin=nzb)
do j=nzb+1,nzb+nzt
if ((jmin_ <= b%ia2(j)).and.(b%ia2(j) <= jmax_)) then
nzb = nzb + 1
b%aspk(nzb) = tmp%aspk(j)
b%ia1(nzb) = tmp%ia1(j)
b%ia2(nzb) = tmp%ia2(j)
b%aspk(nzb) = b%aspk(j)
b%ia1(nzb) = b%ia1(j)
b%ia2(nzb) = b%ia2(j)
end if
end do
end do
b%infoa(psb_nnz_) = nzb
@ -141,7 +140,6 @@ subroutine psb_zspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale)
end if
call psb_fixcoo(b,info)
call psb_sp_trim(b,info)
call psb_sp_free(tmp,info)
call psb_erractionrestore(err_act)
return

@ -38,7 +38,7 @@
!* appending to B). Output is always COO. Input might be anything, *
!* *
!*****************************************************************************
subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw)
subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw,srt)
! Output is always in COO format into B, irrespective of
! the input format
use psb_spmat_type
@ -54,8 +54,9 @@ subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw)
logical, intent(in), optional :: append
integer, intent(in), target, optional :: iren(:)
integer, intent(in), optional :: lrw
logical, intent(in), optional :: srt
logical :: append_
logical :: append_,srt_
integer :: i,j,k,ip,jp,nr,idx, nz,iret,nzb, nza, lrw_, irw_, err_act
character(len=20) :: name, ch_err
@ -79,6 +80,12 @@ subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw)
append_ = .false.
endif
if (present(srt)) then
srt_ = srt
else
srt_ = .true.
endif
if (append_) then
nzb = b%infoa(psb_nnz_)
@ -103,7 +110,7 @@ subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw)
b%infoa(psb_nnz_) = nzb+nz
b%m = b%m+lrw_-irw+1
b%k = max(b%k,a%k)
if (srt_) call psb_fixcoo(b,info)
!!$ call psb_erractionrestore(err_act)
return

@ -73,11 +73,11 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
Integer :: np,me,counter,proc,i, &
& n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,&
& nrmin,data_
Type(psb_dspmat_type) :: tmp
& nrmin,data_,ngtz
Integer :: l1, icomm, err_act
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &
& rvsz(:), bsdindx(:),sdsz(:)
& rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:)
real(kind(1.d0)), allocatable :: valsnd(:)
integer, pointer :: idxv(:)
logical :: rwcnv_,clcnv_,cliprow_
character(len=5) :: outfmt_
@ -220,11 +220,9 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_nullify_sp(tmp)
call psb_sp_all(0,0,tmp,max(iszs,1),info)
tmp%fida='COO'
call psb_sp_setifld(psb_spmat_asb_,psb_state_,tmp,info)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == 0) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == 0) call psb_ensure_size(max(iszs,1),valsnd,info)
if (debugprt) then
open(20+me)
@ -240,22 +238,23 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
tot_elem=0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = psb_sp_get_nnz_row(idx,a)
call psb_sp_getblk(idx,a,tmp,info,append=.true.)
call psb_sp_getrow(idx,a,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= 0) then
info=4010
ch_err='psb_sp_getblk'
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
@ -266,28 +265,23 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
counter = counter+n_el_send+3
Enddo
nz = tmp%infoa(psb_nnz_)
nz = tot_elem
if (rwcnv_) call psb_loc_to_glob(tmp%ia1(1:nz),desc_a,info,iact='I')
if (clcnv_) call psb_loc_to_glob(tmp%ia2(1:nz),desc_a,info,iact='I')
if (rwcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (clcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= 0) then
info=4010
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (debugprt) then
open(30+me)
call psb_csprt(30+me,tmp,head='% SPHALO border SEND .')
close(30+me)
end if
call mpi_alltoallv(tmp%aspk,sdsz,bsdindx,mpi_double_precision,&
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_precision,&
& blk%aspk,rvsz,brvindx,mpi_double_precision,icomm,info)
call mpi_alltoallv(tmp%ia1,sdsz,bsdindx,mpi_integer,&
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& blk%ia1,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(tmp%ia2,sdsz,bsdindx,mpi_integer,&
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& blk%ia2,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= 0) then
info=4010
@ -379,15 +373,9 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
!!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t6-t2,t7-t6,t8-t7,t3-t8
!!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t2-t1,t3-t2,t4-t3,t5-t4
Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,stat=info)
Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,&
& iasnd,jasnd,valsnd,stat=info)
call psb_sp_free(tmp,info)
if (info /= 0) then
info=4010
ch_err='psb_sp_free'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_erractionrestore(err_act)
return

@ -72,12 +72,12 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
! ...local scalars....
Integer :: np,me,counter,proc,i, &
& n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz, &
& nrmin,data_
Type(psb_zspmat_type) :: tmp
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,&
& nrmin,data_,ngtz
Integer :: l1, icomm, err_act
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &
& rvsz(:), bsdindx(:),sdsz(:)
& rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:)
complex(kind(1.d0)), allocatable :: valsnd(:)
integer, pointer :: idxv(:)
logical :: rwcnv_,clcnv_,cliprow_
character(len=5) :: outfmt_
@ -220,11 +220,17 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
end if
mat_recv = iszr
iszs=sum(sdsz)
call psb_nullify_sp(tmp)
call psb_sp_all(0,0,tmp,max(iszs,1),info)
tmp%fida='COO'
call psb_sp_setifld(psb_spmat_asb_,psb_state_,tmp,info)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == 0) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == 0) call psb_ensure_size(max(iszs,1),valsnd,info)
if (debugprt) then
open(20+me)
do i=1, psb_cd_get_local_cols(desc_a)
write(20+me,*) i,desc_a%loc_to_glob(i)
end do
close(20+me)
end if
t2 = psb_wtime()
l1 = 0
@ -232,22 +238,23 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
counter=1
idx = 0
tot_elem=0
Do
proc=idxv(counter)
if (proc == -1) exit
n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv
n_el_send=idxv(counter+psb_n_elem_send_)
tot_elem=0
Do j=0,n_el_send-1
idx = idxv(counter+psb_elem_send_+j)
n_elem = psb_sp_get_nnz_row(idx,a)
call psb_sp_getblk(idx,a,tmp,info,append=.true.)
call psb_sp_getrow(idx,a,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem)
if (info /= 0) then
info=4010
ch_err='psb_sp_getblk'
ch_err='psb_sp_getrow'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
@ -258,28 +265,23 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
counter = counter+n_el_send+3
Enddo
nz = tmp%infoa(psb_nnz_)
nz = tot_elem
if (rwcnv_) call psb_loc_to_glob(tmp%ia1(1:nz),desc_a,info,iact='I')
if (clcnv_) call psb_loc_to_glob(tmp%ia2(1:nz),desc_a,info,iact='I')
if (rwcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I')
if (clcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I')
if (info /= 0) then
info=4010
ch_err='psb_loc_to_glob'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (debugprt) then
open(30+me)
call psb_csprt(30+me,tmp,head='% SPHALO border SEND .')
close(30+me)
end if
call mpi_alltoallv(tmp%aspk,sdsz,bsdindx,mpi_double_complex,&
call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_complex,&
& blk%aspk,rvsz,brvindx,mpi_double_complex,icomm,info)
call mpi_alltoallv(tmp%ia1,sdsz,bsdindx,mpi_integer,&
call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,&
& blk%ia1,rvsz,brvindx,mpi_integer,icomm,info)
call mpi_alltoallv(tmp%ia2,sdsz,bsdindx,mpi_integer,&
call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,&
& blk%ia2,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= 0) then
info=4010
@ -371,15 +373,9 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
!!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t6-t2,t7-t6,t8-t7,t3-t8
!!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t2-t1,t3-t2,t4-t3,t5-t4
Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,stat=info)
Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,&
& iasnd,jasnd,valsnd,stat=info)
call psb_sp_free(tmp,info)
if (info /= 0) then
info=4010
ch_err='psb_sp_free'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_erractionrestore(err_act)
return

Loading…
Cancel
Save