From 5ed6b7970b366694f5f62d31d3fee1f799277fa2 Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Wed, 21 Feb 2007 20:54:47 +0000 Subject: [PATCH] Fixes for complex halo gather. --- Makefile | 4 +- base/modules/psb_tools_mod.f90 | 5 +- base/serial/psb_zcsprt.f90 | 54 ++++++++++++++-- base/serial/psb_zspgtblk.f90 | 1 - base/tools/psb_dcdovr.f90 | 1 - base/tools/psb_dsphalo.f90 | 6 +- base/tools/psb_zcdovr.f90 | 5 +- base/tools/psb_zsphalo.f90 | 109 ++++++++++++++++++++++++--------- 8 files changed, 137 insertions(+), 48 deletions(-) diff --git a/Makefile b/Makefile index 4d6e1793..9f3949af 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ include Make.inc -PREC=../mld2p4 -#PREC=prec +#PREC=../mld2p4-dev +PREC=prec library: ( [ -d lib ] || mkdir lib) diff --git a/base/modules/psb_tools_mod.f90 b/base/modules/psb_tools_mod.f90 index 002d4f55..23ef6cf7 100644 --- a/base/modules/psb_tools_mod.f90 +++ b/base/modules/psb_tools_mod.f90 @@ -142,15 +142,16 @@ Module psb_tools_mod character(len=5), optional :: outfmt integer, intent(in), optional :: data end Subroutine psb_dsphalo - Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) + Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) use psb_descriptor_type use psb_spmat_type Type(psb_zspmat_type),Intent(in) :: a Type(psb_zspmat_type),Intent(inout) :: blk Type(psb_desc_type),Intent(in) :: desc_a integer, intent(out) :: info - logical, optional, intent(in) :: rwcnv,clcnv + logical, optional, intent(in) :: rwcnv,clcnv,cliprow character(len=5), optional :: outfmt + integer, intent(in), optional :: data end Subroutine psb_zsphalo end interface diff --git a/base/serial/psb_zcsprt.f90 b/base/serial/psb_zcsprt.f90 index ee369eb0..bdac1c00 100644 --- a/base/serial/psb_zcsprt.f90 +++ b/base/serial/psb_zcsprt.f90 @@ -74,13 +74,16 @@ subroutine psb_zcsprt(iout,a,iv,eirs,eics,head,ivr,ivc) if (present(head)) then write(iout,'(a)') '%%MatrixMarket matrix coordinate complex general' write(iout,'(a,a)') '% ',head - write(iout,'(a)') '%' + write(iout,'(a)') '%' + write(iout,'(a,a)') '% ',toupper(a%fida) endif - if (toupper(a%fida)=='CSR') then + select case(toupper(a%fida)) + + case ('CSR') write(iout,*) a%m,a%k,a%ia2(a%m+1)-1 - + if (present(iv)) then do i=1, a%m do j=a%ia2(i),a%ia2(i+1)-1 @@ -115,7 +118,45 @@ subroutine psb_zcsprt(iout,a,iv,eirs,eics,head,ivr,ivc) endif endif - else if (toupper(a%fida)=='COO') then + case ('CSC') + + write(iout,*) a%m,a%k,a%ia2(a%k+1)-1 + + if (present(iv)) then + do i=1, a%k + do j=a%ia2(i),a%ia2(i+1)-1 + write(iout,frmtr) iv(irs+a%ia1(j)),iv(ics+i),a%aspk(j) + enddo + enddo + else + if (present(ivr).and..not.present(ivc)) then + do i=1, a%k + do j=a%ia2(i),a%ia2(i+1)-1 + write(iout,frmtr) ivr(irs+a%ia1(j)),(ics+i),a%aspk(j) + enddo + enddo + else if (present(ivr).and.present(ivc)) then + do i=1, a%k + do j=a%ia2(i),a%ia2(i+1)-1 + write(iout,frmtr) ivr(irs+a%ia1(j)),ivc(ics+i),a%aspk(j) + enddo + enddo + else if (.not.present(ivr).and.present(ivc)) then + do i=1, a%m + do j=a%ia2(i),a%ia2(i+1)-1 + write(iout,frmtr) (irs+a%ia1(j)),ivc(ics+i),a%aspk(j) + enddo + enddo + else if (.not.present(ivr).and..not.present(ivc)) then + do i=1, a%k + do j=a%ia2(i),a%ia2(i+1)-1 + write(iout,frmtr) (irs+a%ia1(j)),(ics+i),a%aspk(j) + enddo + enddo + endif + endif + + case ('COO') if (present(ivr).and..not.present(ivc)) then write(iout,*) a%m,a%k,a%infoa(psb_nnz_) @@ -138,7 +179,8 @@ subroutine psb_zcsprt(iout,a,iv,eirs,eics,head,ivr,ivc) write(iout,frmtr) a%ia1(j),a%ia2(j),a%aspk(j) enddo endif - else + case default write(0,*) 'Feeling lazy today, format not implemented: "',a%fida,'"' - endif + end select + close(iout) end subroutine psb_zcsprt diff --git a/base/serial/psb_zspgtblk.f90 b/base/serial/psb_zspgtblk.f90 index 02f2cd0a..32deedf0 100644 --- a/base/serial/psb_zspgtblk.f90 +++ b/base/serial/psb_zspgtblk.f90 @@ -216,7 +216,6 @@ contains k=0 do i=irw,lrw - do j=a%ia2(i),a%ia2(i+1)-1 k = k + 1 b%ia1(nzb+k) = i diff --git a/base/tools/psb_dcdovr.f90 b/base/tools/psb_dcdovr.f90 index 335d18df..9be55a44 100644 --- a/base/tools/psb_dcdovr.f90 +++ b/base/tools/psb_dcdovr.f90 @@ -55,7 +55,6 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info, extype) use psb_realloc_mod use psi_mod use mpi - Implicit None ! .. Array Arguments .. diff --git a/base/tools/psb_dsphalo.f90 b/base/tools/psb_dsphalo.f90 index 421ba053..220c1295 100644 --- a/base/tools/psb_dsphalo.f90 +++ b/base/tools/psb_dsphalo.f90 @@ -51,7 +51,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) use psb_serial_mod use psb_descriptor_type use psb_realloc_mod - use psb_tools_mod, only : psb_glob_to_loc, psb_loc_to_glob + use psb_tools_mod, psb_protect_name => psb_dsphalo use psb_error_mod use psb_penv_mod use mpi @@ -67,8 +67,8 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) ! ...local scalars.... Integer :: np,me,counter,proc,i, & & n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,& - & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,nrmin,& - & data_ + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& + & nrmin,data_ Type(psb_dspmat_type) :: tmp Integer :: l1, icomm, err_act Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), & diff --git a/base/tools/psb_zcdovr.f90 b/base/tools/psb_zcdovr.f90 index 9b11860b..ed10fd2a 100644 --- a/base/tools/psb_zcdovr.f90 +++ b/base/tools/psb_zcdovr.f90 @@ -164,7 +164,6 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info, extype) index_dim = size(desc_a%halo_index) elem_dim = size(desc_a%halo_index) - l_tmp_ovr_idx = novr*(3*Max(2*index_dim,1)+1) l_tmp_halo = novr*(3*Size(desc_a%halo_index)) @@ -711,8 +710,8 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info, extype) ! ! At this point we have gathered all the indices in the halo at ! N levels of overlap. Just call icdasb forcing to use - ! the halo_index provided. This is - ! the same routine as gets called inside CDASB. + ! the halo_index provided. This is the same routine as gets + ! called inside CDASB. ! if (debug) then diff --git a/base/tools/psb_zsphalo.f90 b/base/tools/psb_zsphalo.f90 index e796ea89..d550810b 100644 --- a/base/tools/psb_zsphalo.f90 +++ b/base/tools/psb_zsphalo.f90 @@ -45,12 +45,13 @@ !* * !* * !***************************************************************************** -Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) +Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,cliprow,outfmt,data) + use psb_const_mod use psb_serial_mod use psb_descriptor_type use psb_realloc_mod - use psb_tools_mod, only : psb_glob_to_loc, psb_loc_to_glob + use psb_tools_mod, psb_protect_name => psb_zsphalo use psb_error_mod use psb_penv_mod use mpi @@ -58,21 +59,24 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) Type(psb_zspmat_type),Intent(in) :: a Type(psb_zspmat_type),Intent(inout) :: blk - Type(psb_desc_type),Intent(in) :: desc_a + Type(psb_desc_type),Intent(in), target :: desc_a integer, intent(out) :: info - logical, optional, intent(in) :: rwcnv,clcnv + logical, optional, intent(in) :: rwcnv,clcnv,cliprow character(len=5), optional :: outfmt + integer, intent(in), optional :: data ! ...local scalars.... Integer :: np,me,counter,proc,i, & & n_el_send,k,n_el_recv,ictxt, idx, r, tot_elem,& - & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz + & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz, & + & nrmin,data_ Type(psb_zspmat_type) :: tmp Integer :: l1, icomm, err_act Integer, allocatable :: sdid(:,:), brvindx(:),rvid(:,:), & & rvsz(:), bsdindx(:),sdsz(:) - logical :: rwcnv_,clcnv_ + integer, pointer :: idxv(:) + logical :: rwcnv_,clcnv_,cliprow_ character(len=5) :: outfmt_ - Logical,Parameter :: debug=.false., usea2av=.true. + Logical,Parameter :: debug=.false., debugprt=.false. real(kind(1.d0)) :: t1,t2,t3,t4,t5,t6,t7,t8,t9 character(len=20) :: name, ch_err @@ -92,9 +96,19 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) else clcnv_ = .true. endif + if (present(cliprow)) then + cliprow_ = cliprow + else + cliprow_ = .false. + endif + if (present(data)) then + data_ = data + else + data_ = psb_comm_halo_ + endif if (present(outfmt)) then - call touppers(outfmt,outfmt_) + outfmt_ = toupper(outfmt) else outfmt_ = 'CSR' endif @@ -115,30 +129,45 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) If (debug) Write(0,*)'dsphalo',me + select case(data_) + case(psb_comm_halo_) + idxv => desc_a%halo_index + + case(psb_comm_ovr_) + idxv => desc_a%ovrlap_index + + case(psb_comm_ext_) + idxv => desc_a%ext_index + + case default + call psb_errpush(4010,name,a_err='wrong Data selector') + goto 9999 + end select + l1 = 0 sdsz(:)=0 rvsz(:)=0 ipx = 1 - blk%k = a%k - blk%m = 0 brvindx(ipx) = 0 bsdindx(ipx) = 0 counter=1 idx = 0 idxs = 0 idxr = 0 + blk%k = a%k + blk%m = 0 ! For all rows in the halo descriptor, extract and send/receive. Do - proc=desc_a%halo_index(counter) + proc=idxv(counter) if (proc == -1) exit - n_el_recv = desc_a%halo_index(counter+psb_n_elem_recv_) + n_el_recv = idxv(counter+psb_n_elem_recv_) counter = counter+n_el_recv - n_el_send = desc_a%halo_index(counter+psb_n_elem_send_) + n_el_send = idxv(counter+psb_n_elem_send_) tot_elem = 0 Do j=0,n_el_send-1 - idx = desc_a%halo_index(counter+psb_elem_send_+j) + idx = idxv(counter+psb_elem_send_+j) n_elem = psb_sp_get_nnz_row(idx,a) tot_elem = tot_elem+n_elem Enddo @@ -162,11 +191,11 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) idxr = 0 counter = 1 Do - proc=desc_a%halo_index(counter) + proc=idxv(counter) if (proc == -1) exit - n_el_recv = desc_a%halo_index(counter+psb_n_elem_recv_) + n_el_recv = idxv(counter+psb_n_elem_recv_) counter = counter+n_el_recv - n_el_send = desc_a%halo_index(counter+psb_n_elem_send_) + n_el_send = idxv(counter+psb_n_elem_send_) bsdindx(proc+1) = idxs idxs = idxs + sdsz(proc+1) @@ -199,15 +228,15 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) idx = 0 Do - proc=desc_a%halo_index(counter) + proc=idxv(counter) if (proc == -1) exit - n_el_recv=desc_a%halo_index(counter+psb_n_elem_recv_) + n_el_recv=idxv(counter+psb_n_elem_recv_) counter=counter+n_el_recv - n_el_send=desc_a%halo_index(counter+psb_n_elem_send_) + n_el_send=idxv(counter+psb_n_elem_send_) tot_elem=0 Do j=0,n_el_send-1 - idx = desc_a%halo_index(counter+psb_elem_send_+j) + idx = idxv(counter+psb_elem_send_+j) n_elem = psb_sp_get_nnz_row(idx,a) call psb_sp_getblk(idx,a,tmp,info,append=.true.) @@ -225,8 +254,6 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) counter = counter+n_el_send+3 Enddo nz = tmp%infoa(psb_nnz_) -!!$ call csprt(20+me,tmp,head='% SPHALO border SEND .') -!!$ close(20+me) if (rwcnv_) call psb_loc_to_glob(tmp%ia1(1:nz),desc_a,info,iact='I') if (clcnv_) call psb_loc_to_glob(tmp%ia2(1:nz),desc_a,info,iact='I') @@ -236,8 +263,11 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) call psb_errpush(info,name,a_err=ch_err) goto 9999 end if -!!$ call csprt(30+me,tmp,head='% SPHALO border SEND .') -!!$ close(30+me) + if (debugprt) then + open(30+me) + call psb_csprt(30+me,tmp,head='% SPHALO border SEND .') + close(30+me) + end if call mpi_alltoallv(tmp%aspk,sdsz,bsdindx,mpi_double_complex,& @@ -259,8 +289,9 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) ! ! Convert into local numbering ! - if (rwcnv_) call psb_glob_to_loc(blk%ia1(1:iszr),desc_a,info,iact='I') + if (rwcnv_) call psb_glob_to_loc(blk%ia1(1:iszr),desc_a,info,iact='I',owned=cliprow_) if (clcnv_) call psb_glob_to_loc(blk%ia2(1:iszr),desc_a,info,iact='I') + if (info /= 0) then info=4010 ch_err='psbglob_to_loc' @@ -268,24 +299,38 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) goto 9999 end if + if (debugprt) then + blk%fida='COO' + blk%infoa(psb_nnz_)=iszr + open(40+me) + call psb_csprt(40+me,blk,head='% SPHALO border .') + close(40+me) + end if l1 = 0 + blk%m=0 + nrmin=max(0,a%m) Do i=1,iszr !!$ write(0,*) work5(i),work6(i) r=(blk%ia1(i)) k=(blk%ia2(i)) - If (k.Gt.0) Then + If ((r>nrmin).and.(k>0)) Then l1=l1+1 blk%aspk(l1) = blk%aspk(i) blk%ia1(l1) = r blk%ia2(l1) = k blk%k = max(blk%k,k) + blk%m = max(blk%m,r) End If Enddo blk%fida='COO' blk%infoa(psb_nnz_)=l1 -!!$ open(50+me) -!!$ call csprt(50+me,blk,head='% SPHALO border .') -!!$ close(50+me) + blk%m = blk%m - a%m + if (debugprt) then + open(50+me) + call psb_csprt(50+me,blk,head='% SPHALO border .') + close(50+me) + call psb_barrier(ictxt) + end if t4 = psb_wtime() if(debug) Write(0,*)me,'End first loop',counter,l1,blk%m @@ -308,6 +353,10 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rwcnv,clcnv,outfmt) ! Do nothing! case default write(0,*) 'Error in DSPHALO : invalid outfmt "',outfmt_,'"' + info=4010 + ch_err='Bad outfmt' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 end select t5 = psb_wtime()