Fixes for complex halo gather.

psblas3-type-indexed
Salvatore Filippone 18 years ago
parent 6093d6a685
commit 5ed6b7970b

@ -1,6 +1,6 @@
include Make.inc include Make.inc
PREC=../mld2p4 #PREC=../mld2p4-dev
#PREC=prec PREC=prec
library: library:
( [ -d lib ] || mkdir lib) ( [ -d lib ] || mkdir lib)

@ -142,15 +142,16 @@ Module psb_tools_mod
character(len=5), optional :: outfmt character(len=5), optional :: outfmt
integer, intent(in), optional :: data integer, intent(in), optional :: data
end Subroutine psb_dsphalo end Subroutine psb_dsphalo
Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
use psb_descriptor_type use psb_descriptor_type
use psb_spmat_type use psb_spmat_type
Type(psb_zspmat_type),Intent(in) :: a Type(psb_zspmat_type),Intent(in) :: a
Type(psb_zspmat_type),Intent(inout) :: blk Type(psb_zspmat_type),Intent(inout) :: blk
Type(psb_desc_type),Intent(in) :: desc_a Type(psb_desc_type),Intent(in) :: desc_a
integer, intent(out) :: info integer, intent(out) :: info
logical, optional, intent(in) :: rwcnv,clcnv logical, optional, intent(in) :: rwcnv,clcnv,cliprow
character(len=5), optional :: outfmt character(len=5), optional :: outfmt
integer, intent(in), optional :: data
end Subroutine psb_zsphalo end Subroutine psb_zsphalo
end interface end interface

@ -75,9 +75,12 @@ subroutine psb_zcsprt(iout,a,iv,eirs,eics,head,ivr,ivc)
write(iout,'(a)') '%%MatrixMarket matrix coordinate complex general' write(iout,'(a)') '%%MatrixMarket matrix coordinate complex general'
write(iout,'(a,a)') '% ',head write(iout,'(a,a)') '% ',head
write(iout,'(a)') '%' write(iout,'(a)') '%'
write(iout,'(a,a)') '% ',toupper(a%fida)
endif endif
if (toupper(a%fida)=='CSR') then select case(toupper(a%fida))
case ('CSR')
write(iout,*) a%m,a%k,a%ia2(a%m+1)-1 write(iout,*) a%m,a%k,a%ia2(a%m+1)-1
@ -115,7 +118,45 @@ subroutine psb_zcsprt(iout,a,iv,eirs,eics,head,ivr,ivc)
endif endif
endif endif
else if (toupper(a%fida)=='COO') then case ('CSC')
write(iout,*) a%m,a%k,a%ia2(a%k+1)-1
if (present(iv)) then
do i=1, a%k
do j=a%ia2(i),a%ia2(i+1)-1
write(iout,frmtr) iv(irs+a%ia1(j)),iv(ics+i),a%aspk(j)
enddo
enddo
else
if (present(ivr).and..not.present(ivc)) then
do i=1, a%k
do j=a%ia2(i),a%ia2(i+1)-1
write(iout,frmtr) ivr(irs+a%ia1(j)),(ics+i),a%aspk(j)
enddo
enddo
else if (present(ivr).and.present(ivc)) then
do i=1, a%k
do j=a%ia2(i),a%ia2(i+1)-1
write(iout,frmtr) ivr(irs+a%ia1(j)),ivc(ics+i),a%aspk(j)
enddo
enddo
else if (.not.present(ivr).and.present(ivc)) then
do i=1, a%m
do j=a%ia2(i),a%ia2(i+1)-1
write(iout,frmtr) (irs+a%ia1(j)),ivc(ics+i),a%aspk(j)
enddo
enddo
else if (.not.present(ivr).and..not.present(ivc)) then
do i=1, a%k
do j=a%ia2(i),a%ia2(i+1)-1
write(iout,frmtr) (irs+a%ia1(j)),(ics+i),a%aspk(j)
enddo
enddo
endif
endif
case ('COO')
if (present(ivr).and..not.present(ivc)) then if (present(ivr).and..not.present(ivc)) then
write(iout,*) a%m,a%k,a%infoa(psb_nnz_) write(iout,*) a%m,a%k,a%infoa(psb_nnz_)
@ -138,7 +179,8 @@ subroutine psb_zcsprt(iout,a,iv,eirs,eics,head,ivr,ivc)
write(iout,frmtr) a%ia1(j),a%ia2(j),a%aspk(j) write(iout,frmtr) a%ia1(j),a%ia2(j),a%aspk(j)
enddo enddo
endif endif
else case default
write(0,*) 'Feeling lazy today, format not implemented: "',a%fida,'"' write(0,*) 'Feeling lazy today, format not implemented: "',a%fida,'"'
endif end select
close(iout)
end subroutine psb_zcsprt end subroutine psb_zcsprt

@ -216,7 +216,6 @@ contains
k=0 k=0
do i=irw,lrw do i=irw,lrw
do j=a%ia2(i),a%ia2(i+1)-1 do j=a%ia2(i),a%ia2(i+1)-1
k = k + 1 k = k + 1
b%ia1(nzb+k) = i b%ia1(nzb+k) = i

@ -55,7 +55,6 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info, extype)
use psb_realloc_mod use psb_realloc_mod
use psi_mod use psi_mod
use mpi use mpi
Implicit None Implicit None
! .. Array Arguments .. ! .. Array Arguments ..

@ -51,7 +51,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
use psb_serial_mod use psb_serial_mod
use psb_descriptor_type use psb_descriptor_type
use psb_realloc_mod use psb_realloc_mod
use psb_tools_mod, only : psb_glob_to_loc, psb_loc_to_glob use psb_tools_mod, psb_protect_name => psb_dsphalo
use psb_error_mod use psb_error_mod
use psb_penv_mod use psb_penv_mod
use mpi use mpi
@ -67,8 +67,8 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
! ...local scalars.... ! ...local scalars....
Integer :: np,me,counter,proc,i, & Integer :: np,me,counter,proc,i, &
& n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,& & n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,nrmin,& & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,&
& data_ & nrmin,data_
Type(psb_dspmat_type) :: tmp Type(psb_dspmat_type) :: tmp
Integer :: l1, icomm, err_act Integer :: l1, icomm, err_act
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), & Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &

@ -164,7 +164,6 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info, extype)
index_dim = size(desc_a%halo_index) index_dim = size(desc_a%halo_index)
elem_dim = size(desc_a%halo_index) elem_dim = size(desc_a%halo_index)
l_tmp_ovr_idx = novr*(3*Max(2*index_dim,1)+1) l_tmp_ovr_idx = novr*(3*Max(2*index_dim,1)+1)
l_tmp_halo = novr*(3*Size(desc_a%halo_index)) l_tmp_halo = novr*(3*Size(desc_a%halo_index))
@ -711,8 +710,8 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info, extype)
! !
! At this point we have gathered all the indices in the halo at ! At this point we have gathered all the indices in the halo at
! N levels of overlap. Just call icdasb forcing to use ! N levels of overlap. Just call icdasb forcing to use
! the halo_index provided. This is ! the halo_index provided. This is the same routine as gets
! the same routine as gets called inside CDASB. ! called inside CDASB.
! !
if (debug) then if (debug) then

@ -45,12 +45,13 @@
!* * !* *
!* * !* *
!***************************************************************************** !*****************************************************************************
Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data)
use psb_const_mod
use psb_serial_mod use psb_serial_mod
use psb_descriptor_type use psb_descriptor_type
use psb_realloc_mod use psb_realloc_mod
use psb_tools_mod, only : psb_glob_to_loc, psb_loc_to_glob use psb_tools_mod, psb_protect_name => psb_zsphalo
use psb_error_mod use psb_error_mod
use psb_penv_mod use psb_penv_mod
use mpi use mpi
@ -58,21 +59,24 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
Type(psb_zspmat_type),Intent(in) :: a Type(psb_zspmat_type),Intent(in) :: a
Type(psb_zspmat_type),Intent(inout) :: blk Type(psb_zspmat_type),Intent(inout) :: blk
Type(psb_desc_type),Intent(in) :: desc_a Type(psb_desc_type),Intent(in), target :: desc_a
integer, intent(out) :: info integer, intent(out) :: info
logical, optional, intent(in) :: rwcnv,clcnv logical, optional, intent(in) :: rwcnv,clcnv,cliprow
character(len=5), optional :: outfmt character(len=5), optional :: outfmt
integer, intent(in), optional :: data
! ...local scalars.... ! ...local scalars....
Integer :: np,me,counter,proc,i, & Integer :: np,me,counter,proc,i, &
& n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,& & n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz, &
& nrmin,data_
Type(psb_zspmat_type) :: tmp Type(psb_zspmat_type) :: tmp
Integer :: l1, icomm, err_act Integer :: l1, icomm, err_act
Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), & Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), &
& rvsz(:), bsdindx(:),sdsz(:) & rvsz(:), bsdindx(:),sdsz(:)
logical :: rwcnv_,clcnv_ integer, pointer :: idxv(:)
logical :: rwcnv_,clcnv_,cliprow_
character(len=5) :: outfmt_ character(len=5) :: outfmt_
Logical,Parameter :: debug=.false., usea2av=.true. Logical,Parameter :: debug=.false., debugprt=.false.
real(kind(1.d0)) :: t1,t2,t3,t4,t5,t6,t7,t8,t9 real(kind(1.d0)) :: t1,t2,t3,t4,t5,t6,t7,t8,t9
character(len=20) :: name, ch_err character(len=20) :: name, ch_err
@ -92,9 +96,19 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
else else
clcnv_ = .true. clcnv_ = .true.
endif endif
if (present(cliprow)) then
cliprow_ = cliprow
else
cliprow_ = .false.
endif
if (present(data)) then
data_ = data
else
data_ = psb_comm_halo_
endif
if (present(outfmt)) then if (present(outfmt)) then
call touppers(outfmt,outfmt_) outfmt_ = toupper(outfmt)
else else
outfmt_ = 'CSR' outfmt_ = 'CSR'
endif endif
@ -115,30 +129,45 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
If (debug) Write(0,*)'dsphalo',me If (debug) Write(0,*)'dsphalo',me
select case(data_)
case(psb_comm_halo_)
idxv => desc_a%halo_index
case(psb_comm_ovr_)
idxv => desc_a%ovrlap_index
case(psb_comm_ext_)
idxv => desc_a%ext_index
case default
call psb_errpush(4010,name,a_err='wrong Data selector')
goto 9999
end select
l1 = 0 l1 = 0
sdsz(:)=0 sdsz(:)=0
rvsz(:)=0 rvsz(:)=0
ipx = 1 ipx = 1
blk%k = a%k
blk%m = 0
brvindx(ipx) = 0 brvindx(ipx) = 0
bsdindx(ipx) = 0 bsdindx(ipx) = 0
counter=1 counter=1
idx = 0 idx = 0
idxs = 0 idxs = 0
idxr = 0 idxr = 0
blk%k = a%k
blk%m = 0
! For all rows in the halo descriptor, extract and send/receive. ! For all rows in the halo descriptor, extract and send/receive.
Do Do
proc=desc_a%halo_index(counter) proc=idxv(counter)
if (proc == -1) exit if (proc == -1) exit
n_el_recv = desc_a%halo_index(counter+psb_n_elem_recv_) n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv counter = counter+n_el_recv
n_el_send = desc_a%halo_index(counter+psb_n_elem_send_) n_el_send = idxv(counter+psb_n_elem_send_)
tot_elem = 0 tot_elem = 0
Do j=0,n_el_send-1 Do j=0,n_el_send-1
idx = desc_a%halo_index(counter+psb_elem_send_+j) idx = idxv(counter+psb_elem_send_+j)
n_elem = psb_sp_get_nnz_row(idx,a) n_elem = psb_sp_get_nnz_row(idx,a)
tot_elem = tot_elem+n_elem tot_elem = tot_elem+n_elem
Enddo Enddo
@ -162,11 +191,11 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
idxr = 0 idxr = 0
counter = 1 counter = 1
Do Do
proc=desc_a%halo_index(counter) proc=idxv(counter)
if (proc == -1) exit if (proc == -1) exit
n_el_recv = desc_a%halo_index(counter+psb_n_elem_recv_) n_el_recv = idxv(counter+psb_n_elem_recv_)
counter = counter+n_el_recv counter = counter+n_el_recv
n_el_send = desc_a%halo_index(counter+psb_n_elem_send_) n_el_send = idxv(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1) idxs = idxs + sdsz(proc+1)
@ -199,15 +228,15 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
idx = 0 idx = 0
Do Do
proc=desc_a%halo_index(counter) proc=idxv(counter)
if (proc == -1) exit if (proc == -1) exit
n_el_recv=desc_a%halo_index(counter+psb_n_elem_recv_) n_el_recv=idxv(counter+psb_n_elem_recv_)
counter=counter+n_el_recv counter=counter+n_el_recv
n_el_send=desc_a%halo_index(counter+psb_n_elem_send_) n_el_send=idxv(counter+psb_n_elem_send_)
tot_elem=0 tot_elem=0
Do j=0,n_el_send-1 Do j=0,n_el_send-1
idx = desc_a%halo_index(counter+psb_elem_send_+j) idx = idxv(counter+psb_elem_send_+j)
n_elem = psb_sp_get_nnz_row(idx,a) n_elem = psb_sp_get_nnz_row(idx,a)
call psb_sp_getblk(idx,a,tmp,info,append=.true.) call psb_sp_getblk(idx,a,tmp,info,append=.true.)
@ -225,8 +254,6 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
counter = counter+n_el_send+3 counter = counter+n_el_send+3
Enddo Enddo
nz = tmp%infoa(psb_nnz_) nz = tmp%infoa(psb_nnz_)
!!$ call csprt(20+me,tmp,head='% SPHALO border SEND .')
!!$ close(20+me)
if (rwcnv_) call psb_loc_to_glob(tmp%ia1(1:nz),desc_a,info,iact='I') if (rwcnv_) call psb_loc_to_glob(tmp%ia1(1:nz),desc_a,info,iact='I')
if (clcnv_) call psb_loc_to_glob(tmp%ia2(1:nz),desc_a,info,iact='I') if (clcnv_) call psb_loc_to_glob(tmp%ia2(1:nz),desc_a,info,iact='I')
@ -236,8 +263,11 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
call psb_errpush(info,name,a_err=ch_err) call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
!!$ call csprt(30+me,tmp,head='% SPHALO border SEND .') if (debugprt) then
!!$ close(30+me) open(30+me)
call psb_csprt(30+me,tmp,head='% SPHALO border SEND .')
close(30+me)
end if
call mpi_alltoallv(tmp%aspk,sdsz,bsdindx,mpi_double_complex,& call mpi_alltoallv(tmp%aspk,sdsz,bsdindx,mpi_double_complex,&
@ -259,8 +289,9 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
! !
! Convert into local numbering ! Convert into local numbering
! !
if (rwcnv_) call psb_glob_to_loc(blk%ia1(1:iszr),desc_a,info,iact='I') if (rwcnv_) call psb_glob_to_loc(blk%ia1(1:iszr),desc_a,info,iact='I',owned=cliprow_)
if (clcnv_) call psb_glob_to_loc(blk%ia2(1:iszr),desc_a,info,iact='I') if (clcnv_) call psb_glob_to_loc(blk%ia2(1:iszr),desc_a,info,iact='I')
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='psbglob_to_loc' ch_err='psbglob_to_loc'
@ -268,24 +299,38 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
goto 9999 goto 9999
end if end if
if (debugprt) then
blk%fida='COO'
blk%infoa(psb_nnz_)=iszr
open(40+me)
call psb_csprt(40+me,blk,head='% SPHALO border .')
close(40+me)
end if
l1 = 0 l1 = 0
blk%m=0
nrmin=max(0,a%m)
Do i=1,iszr Do i=1,iszr
!!$ write(0,*) work5(i),work6(i) !!$ write(0,*) work5(i),work6(i)
r=(blk%ia1(i)) r=(blk%ia1(i))
k=(blk%ia2(i)) k=(blk%ia2(i))
If (k.Gt.0) Then If ((r>nrmin).and.(k>0)) Then
l1=l1+1 l1=l1+1
blk%aspk(l1) = blk%aspk(i) blk%aspk(l1) = blk%aspk(i)
blk%ia1(l1) = r blk%ia1(l1) = r
blk%ia2(l1) = k blk%ia2(l1) = k
blk%k = max(blk%k,k) blk%k = max(blk%k,k)
blk%m = max(blk%m,r)
End If End If
Enddo Enddo
blk%fida='COO' blk%fida='COO'
blk%infoa(psb_nnz_)=l1 blk%infoa(psb_nnz_)=l1
!!$ open(50+me) blk%m = blk%m - a%m
!!$ call csprt(50+me,blk,head='% SPHALO border .') if (debugprt) then
!!$ close(50+me) open(50+me)
call psb_csprt(50+me,blk,head='% SPHALO border .')
close(50+me)
call psb_barrier(ictxt)
end if
t4 = psb_wtime() t4 = psb_wtime()
if(debug) Write(0,*)me,'End first loop',counter,l1,blk%m if(debug) Write(0,*)me,'End first loop',counter,l1,blk%m
@ -308,6 +353,10 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt)
! Do nothing! ! Do nothing!
case default case default
write(0,*) 'Error in DSPHALO : invalid outfmt "',outfmt_,'"' write(0,*) 'Error in DSPHALO : invalid outfmt "',outfmt_,'"'
info=4010
ch_err='Bad outfmt'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end select end select
t5 = psb_wtime() t5 = psb_wtime()

Loading…
Cancel
Save