From c2377670eb94663ba8426edbb49871f8ca123f01 Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Mon, 9 Apr 2007 15:53:32 +0000 Subject: [PATCH] Fixed sphalo to use getrow, not getblk, to avoid problems with subsequente all-to-all. Fixed getblk to use fixcoo by default. Fixed spclip to use getrow, and save on memory space. Should we change fixcoo name? --- base/modules/psb_serial_mod.f90 | 10 +++--- base/serial/psb_dspclip.f90 | 26 +++++++-------- base/serial/psb_dspgtblk.f90 | 17 +++++++--- base/serial/psb_zspclip.f90 | 24 +++++++------- base/serial/psb_zspgtblk.f90 | 17 +++++++--- base/tools/psb_dsphalo.F90 | 50 +++++++++++----------------- base/tools/psb_zsphalo.F90 | 58 +++++++++++++++------------------ 7 files changed, 99 insertions(+), 103 deletions(-) diff --git a/base/modules/psb_serial_mod.f90 b/base/modules/psb_serial_mod.f90 index 635f3449..49bf3391 100644 --- a/base/modules/psb_serial_mod.f90 +++ b/base/modules/psb_serial_mod.f90 @@ -402,25 +402,27 @@ module psb_serial_mod end interface interface psb_sp_getblk - subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw) + subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw,srt) use psb_spmat_type type(psb_dspmat_type), intent(in) :: a integer, intent(in) :: irw type(psb_dspmat_type), intent(inout) :: b + integer, intent(out) :: info logical, intent(in), optional :: append integer, intent(in), target, optional :: iren(:) integer, intent(in), optional :: lrw - integer, intent(out) :: info + logical, intent(in), optional :: srt end subroutine psb_dspgtblk - subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw) + subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw,srt) use psb_spmat_type type(psb_zspmat_type), intent(in) :: a integer, intent(in) :: irw type(psb_zspmat_type), intent(inout) :: b + integer, intent(out) :: info logical, intent(in), optional :: append integer, intent(in), target, optional :: iren(:) integer, intent(in), optional :: lrw - integer, intent(out) :: info + logical, intent(in), optional :: srt end subroutine psb_zspgtblk end interface diff --git a/base/serial/psb_dspclip.f90 b/base/serial/psb_dspclip.f90 index 65dea802..192988e3 100644 --- a/base/serial/psb_dspclip.f90 +++ b/base/serial/psb_dspclip.f90 @@ -48,21 +48,19 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale) logical, intent(in), optional :: rscale,cscale integer :: lrw_, ierr(5), err_act - type(psb_dspmat_type) :: tmp character(len=20) :: name, ch_err integer :: imin_,imax_,jmin_,jmax_ logical :: rscale_,cscale_ integer :: sizeb, nzb, mb, kb, ifst, ilst, nrt, nzt, i, j - integer, parameter :: irbk=40, inzr=16 - + integer, parameter :: irbk=40 + name='psb_dsp_clip' info = 0 call psb_erractionsave(err_act) call psb_set_erraction(0) call psb_nullify_sp(b) - call psb_sp_all(tmp,inzr*irbk,info) - + if (present(imin)) then imin_ = imin @@ -94,7 +92,7 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale) else cscale_ = .true. endif - + if (rscale_) then mb = imax_ - imin_ +1 else @@ -116,16 +114,17 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale) nrt = min(irbk,imax_-i+1) ifst = i ilst = ifst + nrt - 1 - call psb_sp_getblk(ifst,a,tmp,info,lrw=ilst) - nzt = psb_sp_get_nnzeros(tmp) - do j=1, nzt - if ((jmin_ <= tmp%ia2(j)).and.(tmp%ia2(j) <= jmax_)) then + call psb_sp_getrow(ifst,a,nzt,b%ia1,b%ia2,b%aspk,info,& + & lrw=ilst, append=.true.,nzin=nzb) + do j=nzb+1,nzb+nzt + if ((jmin_ <= b%ia2(j)).and.(b%ia2(j) <= jmax_)) then nzb = nzb + 1 - b%aspk(nzb) = tmp%aspk(j) - b%ia1(nzb) = tmp%ia1(j) - b%ia2(nzb) = tmp%ia2(j) + b%aspk(nzb) = b%aspk(j) + b%ia1(nzb) = b%ia1(j) + b%ia2(nzb) = b%ia2(j) end if end do + end do b%infoa(psb_nnz_) = nzb @@ -141,7 +140,6 @@ subroutine psb_dspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale) end if call psb_fixcoo(b,info) call psb_sp_trim(b,info) - call psb_sp_free(tmp,info) call psb_erractionrestore(err_act) return diff --git a/base/serial/psb_dspgtblk.f90 b/base/serial/psb_dspgtblk.f90 index 31862114..e5b5662f 100644 --- a/base/serial/psb_dspgtblk.f90 +++ b/base/serial/psb_dspgtblk.f90 @@ -38,7 +38,7 @@ !* appending to B). Output is always COO. Input might be anything, * !* * !***************************************************************************** -subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw) +subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw,srt) ! Output is always in COO format into B, irrespective of ! the input format use psb_spmat_type @@ -54,10 +54,11 @@ subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw) logical, intent(in), optional :: append integer, intent(in), target, optional :: iren(:) integer, intent(in), optional :: lrw + logical, intent(in), optional :: srt - logical :: append_ - integer :: i,j,k,ip,jp,nr,idx, nz,iret,nzb, nza, lrw_, irw_, err_act - character(len=20) :: name, ch_err + logical :: append_,srt_ + integer :: i,j,k,ip,jp,nr,idx, nz,iret,nzb, nza, lrw_, irw_, err_act + character(len=20) :: name, ch_err name='psb_spgtblk' info = 0 @@ -79,6 +80,12 @@ subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw) append_ = .false. endif + if (present(srt)) then + srt_ = srt + else + srt_ = .true. + endif + if (append_) then nzb = b%infoa(psb_nnz_) @@ -103,7 +110,7 @@ subroutine psb_dspgtblk(irw,a,b,info,append,iren,lrw) b%infoa(psb_nnz_) = nzb+nz b%m = b%m+lrw_-irw+1 b%k = max(b%k,a%k) - + if (srt_) call psb_fixcoo(b,info) !!$ call psb_erractionrestore(err_act) return diff --git a/base/serial/psb_zspclip.f90 b/base/serial/psb_zspclip.f90 index 449cc63b..8e480a06 100644 --- a/base/serial/psb_zspclip.f90 +++ b/base/serial/psb_zspclip.f90 @@ -48,21 +48,19 @@ subroutine psb_zspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale) logical, intent(in), optional :: rscale,cscale integer :: lrw_, ierr(5), err_act - type(psb_zspmat_type) :: tmp character(len=20) :: name, ch_err integer :: imin_,imax_,jmin_,jmax_ logical :: rscale_,cscale_ integer :: sizeb, nzb, mb, kb, ifst, ilst, nrt, nzt, i, j - integer, parameter :: irbk=40, inzr=16 - + integer, parameter :: irbk=40 + name='psb_zsp_clip' info = 0 call psb_erractionsave(err_act) call psb_set_erraction(0) call psb_nullify_sp(b) - call psb_sp_all(tmp,inzr*irbk,info) - + if (present(imin)) then imin_ = imin @@ -116,16 +114,17 @@ subroutine psb_zspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale) nrt = min(irbk,imax_-i+1) ifst = i ilst = ifst + nrt - 1 - call psb_sp_getblk(ifst,a,tmp,info,lrw=ilst) - nzt = psb_sp_get_nnzeros(tmp) - do j=1, nzt - if ((jmin_ <= tmp%ia2(j)).and.(tmp%ia2(j) <= jmax_)) then + call psb_sp_getrow(ifst,a,nzt,b%ia1,b%ia2,b%aspk,info,& + & lrw=ilst, append=.true.,nzin=nzb) + do j=nzb+1,nzb+nzt + if ((jmin_ <= b%ia2(j)).and.(b%ia2(j) <= jmax_)) then nzb = nzb + 1 - b%aspk(nzb) = tmp%aspk(j) - b%ia1(nzb) = tmp%ia1(j) - b%ia2(nzb) = tmp%ia2(j) + b%aspk(nzb) = b%aspk(j) + b%ia1(nzb) = b%ia1(j) + b%ia2(nzb) = b%ia2(j) end if end do + end do b%infoa(psb_nnz_) = nzb @@ -141,7 +140,6 @@ subroutine psb_zspclip(a,b,info,imin,imax,jmin,jmax,rscale,cscale) end if call psb_fixcoo(b,info) call psb_sp_trim(b,info) - call psb_sp_free(tmp,info) call psb_erractionrestore(err_act) return diff --git a/base/serial/psb_zspgtblk.f90 b/base/serial/psb_zspgtblk.f90 index a3b25faa..d65ce115 100644 --- a/base/serial/psb_zspgtblk.f90 +++ b/base/serial/psb_zspgtblk.f90 @@ -38,7 +38,7 @@ !* appending to B). Output is always COO. Input might be anything, * !* * !***************************************************************************** -subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw) +subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw,srt) ! Output is always in COO format into B, irrespective of ! the input format use psb_spmat_type @@ -54,10 +54,11 @@ subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw) logical, intent(in), optional :: append integer, intent(in), target, optional :: iren(:) integer, intent(in), optional :: lrw + logical, intent(in), optional :: srt - logical :: append_ - integer :: i,j,k,ip,jp,nr,idx, nz,iret,nzb, nza, lrw_, irw_, err_act - character(len=20) :: name, ch_err + logical :: append_,srt_ + integer :: i,j,k,ip,jp,nr,idx, nz,iret,nzb, nza, lrw_, irw_, err_act + character(len=20) :: name, ch_err name='psb_spgtblk' info = 0 @@ -79,6 +80,12 @@ subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw) append_ = .false. endif + if (present(srt)) then + srt_ = srt + else + srt_ = .true. + endif + if (append_) then nzb = b%infoa(psb_nnz_) @@ -103,7 +110,7 @@ subroutine psb_zspgtblk(irw,a,b,info,append,iren,lrw) b%infoa(psb_nnz_) = nzb+nz b%m = b%m+lrw_-irw+1 b%k = max(b%k,a%k) - + if (srt_) call psb_fixcoo(b,info) !!$ call psb_erractionrestore(err_act) return diff --git a/base/tools/psb_dsphalo.F90 b/base/tools/psb_dsphalo.F90 index cb7dbeab..cbbd7dbd 100644 --- a/base/tools/psb_dsphalo.F90 +++ b/base/tools/psb_dsphalo.F90 @@ -73,11 +73,11 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) Integer :: np,me,counter,proc,i, & & n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,& & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& - & nrmin,data_ - Type(psb_dspmat_type) :: tmp + & nrmin,data_,ngtz Integer :: l1, icomm, err_act Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), & - & rvsz(:), bsdindx(:),sdsz(:) + & rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:) + real(kind(1.d0)), allocatable :: valsnd(:) integer, pointer :: idxv(:) logical :: rwcnv_,clcnv_,cliprow_ character(len=5) :: outfmt_ @@ -220,11 +220,9 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) end if mat_recv = iszr iszs=sum(sdsz) - call psb_nullify_sp(tmp) - call psb_sp_all(0,0,tmp,max(iszs,1),info) - tmp%fida='COO' - call psb_sp_setifld(psb_spmat_asb_,psb_state_,tmp,info) - + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == 0) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == 0) call psb_ensure_size(max(iszs,1),valsnd,info) if (debugprt) then open(20+me) @@ -240,22 +238,23 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) counter=1 idx = 0 + tot_elem=0 Do proc=idxv(counter) if (proc == -1) exit n_el_recv=idxv(counter+psb_n_elem_recv_) counter=counter+n_el_recv n_el_send=idxv(counter+psb_n_elem_send_) - tot_elem=0 Do j=0,n_el_send-1 idx = idxv(counter+psb_elem_send_+j) n_elem = psb_sp_get_nnz_row(idx,a) - - call psb_sp_getblk(idx,a,tmp,info,append=.true.) + + call psb_sp_getrow(idx,a,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem) if (info /= 0) then info=4010 - ch_err='psb_sp_getblk' + ch_err='psb_sp_getrow' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if @@ -266,28 +265,23 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) counter = counter+n_el_send+3 Enddo - nz = tmp%infoa(psb_nnz_) + nz = tot_elem - if (rwcnv_) call psb_loc_to_glob(tmp%ia1(1:nz),desc_a,info,iact='I') - if (clcnv_) call psb_loc_to_glob(tmp%ia2(1:nz),desc_a,info,iact='I') + if (rwcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (clcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') if (info /= 0) then info=4010 ch_err='psb_loc_to_glob' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - if (debugprt) then - open(30+me) - call psb_csprt(30+me,tmp,head='% SPHALO border SEND .') - close(30+me) - end if - call mpi_alltoallv(tmp%aspk,sdsz,bsdindx,mpi_double_precision,& + call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_precision,& & blk%aspk,rvsz,brvindx,mpi_double_precision,icomm,info) - call mpi_alltoallv(tmp%ia1,sdsz,bsdindx,mpi_integer,& + call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,& & blk%ia1,rvsz,brvindx,mpi_integer,icomm,info) - call mpi_alltoallv(tmp%ia2,sdsz,bsdindx,mpi_integer,& + call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,& & blk%ia2,rvsz,brvindx,mpi_integer,icomm,info) if (info /= 0) then info=4010 @@ -379,15 +373,9 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) !!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t6-t2,t7-t6,t8-t7,t3-t8 !!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t2-t1,t3-t2,t4-t3,t5-t4 - Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,stat=info) + Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) - call psb_sp_free(tmp,info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_free' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if call psb_erractionrestore(err_act) return diff --git a/base/tools/psb_zsphalo.F90 b/base/tools/psb_zsphalo.F90 index a56eb5bc..f0c52e4e 100644 --- a/base/tools/psb_zsphalo.F90 +++ b/base/tools/psb_zsphalo.F90 @@ -72,12 +72,12 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) ! ...local scalars.... Integer :: np,me,counter,proc,i, & & n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,& - & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz, & - & nrmin,data_ - Type(psb_zspmat_type) :: tmp + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & nrmin,data_,ngtz Integer :: l1, icomm, err_act Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), & - & rvsz(:), bsdindx(:),sdsz(:) + & rvsz(:), bsdindx(:),sdsz(:), iasnd(:), jasnd(:) + complex(kind(1.d0)), allocatable :: valsnd(:) integer, pointer :: idxv(:) logical :: rwcnv_,clcnv_,cliprow_ character(len=5) :: outfmt_ @@ -220,11 +220,17 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) end if mat_recv = iszr iszs=sum(sdsz) - call psb_nullify_sp(tmp) - call psb_sp_all(0,0,tmp,max(iszs,1),info) - tmp%fida='COO' - call psb_sp_setifld(psb_spmat_asb_,psb_state_,tmp,info) + call psb_ensure_size(max(iszs,1),iasnd,info) + if (info == 0) call psb_ensure_size(max(iszs,1),jasnd,info) + if (info == 0) call psb_ensure_size(max(iszs,1),valsnd,info) + if (debugprt) then + open(20+me) + do i=1, psb_cd_get_local_cols(desc_a) + write(20+me,*) i,desc_a%loc_to_glob(i) + end do + close(20+me) + end if t2 = psb_wtime() l1 = 0 @@ -232,22 +238,23 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) counter=1 idx = 0 + tot_elem=0 Do proc=idxv(counter) if (proc == -1) exit n_el_recv=idxv(counter+psb_n_elem_recv_) counter=counter+n_el_recv n_el_send=idxv(counter+psb_n_elem_send_) - tot_elem=0 Do j=0,n_el_send-1 idx = idxv(counter+psb_elem_send_+j) n_elem = psb_sp_get_nnz_row(idx,a) - - call psb_sp_getblk(idx,a,tmp,info,append=.true.) + + call psb_sp_getrow(idx,a,ngtz,iasnd,jasnd,valsnd,info,& + & append=.true.,nzin=tot_elem) if (info /= 0) then info=4010 - ch_err='psb_sp_getblk' + ch_err='psb_sp_getrow' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if @@ -258,28 +265,23 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) counter = counter+n_el_send+3 Enddo - nz = tmp%infoa(psb_nnz_) + nz = tot_elem - if (rwcnv_) call psb_loc_to_glob(tmp%ia1(1:nz),desc_a,info,iact='I') - if (clcnv_) call psb_loc_to_glob(tmp%ia2(1:nz),desc_a,info,iact='I') + if (rwcnv_) call psb_loc_to_glob(iasnd(1:nz),desc_a,info,iact='I') + if (clcnv_) call psb_loc_to_glob(jasnd(1:nz),desc_a,info,iact='I') if (info /= 0) then info=4010 ch_err='psb_loc_to_glob' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - if (debugprt) then - open(30+me) - call psb_csprt(30+me,tmp,head='% SPHALO border SEND .') - close(30+me) - end if - call mpi_alltoallv(tmp%aspk,sdsz,bsdindx,mpi_double_complex,& + call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_complex,& & blk%aspk,rvsz,brvindx,mpi_double_complex,icomm,info) - call mpi_alltoallv(tmp%ia1,sdsz,bsdindx,mpi_integer,& + call mpi_alltoallv(iasnd,sdsz,bsdindx,mpi_integer,& & blk%ia1,rvsz,brvindx,mpi_integer,icomm,info) - call mpi_alltoallv(tmp%ia2,sdsz,bsdindx,mpi_integer,& + call mpi_alltoallv(jasnd,sdsz,bsdindx,mpi_integer,& & blk%ia2,rvsz,brvindx,mpi_integer,icomm,info) if (info /= 0) then info=4010 @@ -371,15 +373,9 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) !!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t6-t2,t7-t6,t8-t7,t3-t8 !!$ write(0,'(i3,1x,a,4(1x,g14.5))') me,'DSPHALO timings:',t2-t1,t3-t2,t4-t3,t5-t4 - Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,stat=info) + Deallocate(sdid,brvindx,rvid,bsdindx,rvsz,sdsz,& + & iasnd,jasnd,valsnd,stat=info) - call psb_sp_free(tmp,info) - if (info /= 0) then - info=4010 - ch_err='psb_sp_free' - call psb_errpush(info,name,a_err=ch_err) - goto 9999 - end if call psb_erractionrestore(err_act) return