diff --git a/mlprec/mld_daggrmat_minnrg_asb.F90 b/mlprec/mld_daggrmat_minnrg_asb.F90 index 5ae31b83..4e2ed84b 100644 --- a/mlprec/mld_daggrmat_minnrg_asb.F90 +++ b/mlprec/mld_daggrmat_minnrg_asb.F90 @@ -124,16 +124,16 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) & naggr, nzl,naggrm1,naggrp1, i, j, k, jd, icolF, nrt integer ::ictxt,np,me, err_act, icomm character(len=20) :: name - type(psb_dspmat_type) :: am1,am2, af, ptilde, rtilde + type(psb_dspmat_type) :: am1,am2, af, ptilde, rtilde, atran, atp, atdatp type(psb_dspmat_type) :: am3,am4, ap, adap,atmp,rada, ra, atmp2 real(psb_dpk_), allocatable :: adiag(:), pj(:), xj(:), yj(:), omf(:),omp(:),omi(:),& & oden(:), adinv(:) - logical :: ml_global_nmb, filter_mat + logical :: filter_mat integer :: debug_level, debug_unit integer, parameter :: ncmax=16 real(psb_dpk_) :: omega, anorm, tmp, dg, theta, alpha,beta, ommx - name='mld_aggrmat_minnrg_asb' + name='mld_aggrmat_minnrg' if(psb_get_errstatus().ne.0) return info=0 call psb_erractionsave(err_act) @@ -156,6 +156,9 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) call psb_nullify_sp(Adap) call psb_nullify_sp(Atmp) call psb_nullify_sp(Atmp2) + call psb_nullify_sp(Atran) + call psb_nullify_sp(Atp) + call psb_nullify_sp(atdatp) call psb_nullify_sp(AF) call psb_nullify_sp(ra) call psb_nullify_sp(rada) @@ -165,7 +168,7 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) nglob = psb_cd_get_global_rows(desc_a) nrow = psb_cd_get_local_rows(desc_a) ncol = psb_cd_get_local_cols(desc_a) - + theta = p%rprcparm(mld_aggr_thresh_) naggr = nlaggr(me+1) @@ -181,18 +184,15 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) naggrm1 = sum(nlaggr(1:me)) naggrp1 = sum(nlaggr(1:me+1)) - ml_global_nmb = .true. filter_mat = (p%iprcparm(mld_aggr_filter_) == mld_filter_mat_) - if (ml_global_nmb) then - ilaggr(1:nrow) = ilaggr(1:nrow) + naggrm1 - call psb_halo(ilaggr,desc_a,info) + ilaggr(1:nrow) = ilaggr(1:nrow) + naggrm1 + call psb_halo(ilaggr,desc_a,info) - if (info /= 0) then - call psb_errpush(4010,name,a_err='psb_halo') - goto 9999 - end if + if (info /= 0) then + call psb_errpush(4010,name,a_err='psb_halo') + goto 9999 end if ! naggr: number of local aggregates @@ -221,34 +221,21 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) ! 1. Allocate Ptilde in sparse matrix form ptilde%fida='COO' ptilde%m=ncol - if (ml_global_nmb) then - ptilde%k=ntaggr - call psb_sp_all(ncol,ntaggr,ptilde,ncol,info) - else - ptilde%k=naggr - call psb_sp_all(ncol,naggr,ptilde,ncol,info) - endif + ptilde%k=ntaggr + call psb_sp_all(ncol,ntaggr,ptilde,ncol,info) + if (info /= 0) then call psb_errpush(4010,name,a_err='spall') goto 9999 end if - if (ml_global_nmb) then - do i=1,ncol - ptilde%aspk(i) = done - ptilde%ia1(i) = i - ptilde%ia2(i) = ilaggr(i) - end do - ptilde%infoa(psb_nnz_) = ncol - else - do i=1,nrow - ptilde%aspk(i) = done - ptilde%ia1(i) = i - ptilde%ia2(i) = ilaggr(i) - end do - ptilde%infoa(psb_nnz_) = nrow - endif + do i=1,ncol + ptilde%aspk(i) = done + ptilde%ia1(i) = i + ptilde%ia2(i) = ilaggr(i) + end do + ptilde%infoa(psb_nnz_) = ncol call psb_spcnv(ptilde,info,afmt='csr',dupl=psb_dupl_add_) if (info==0) call psb_spcnv(a,am3,info,afmt='csr',dupl=psb_dupl_add_) @@ -267,7 +254,7 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) call psb_errpush(4010,name,a_err='symbmm 1') goto 9999 end if - + call psb_sp_clone(ap,atmp,info) @@ -287,7 +274,7 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) call psb_symbmm(am3,atmp,adap,info) call psb_numbmm(am3,atmp,adap) call psb_sp_free(atmp,info) - + !!$ write(0,*) 'Columns of AP',psb_sp_get_ncols(ap) !!$ write(0,*) 'Columns of ADAP',psb_sp_get_ncols(adap) call psb_spcnv(ap,info,afmt='coo') @@ -297,18 +284,19 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) if (info /= 0) then write(0,*) 'Failed conversion to CSC' end if - + call csc_mat_col_prod(ap,adap,omp,info) call csc_mat_col_prod(adap,adap,oden,info) call psb_sum(ictxt,omp) call psb_sum(ictxt,oden) - +!!$ write(debug_unit,*) trim(name),' OMP :',omp +!!$ write(debug_unit,*) trim(name),' ODEN:',oden omp = omp/oden !!$ write(0,*) 'Check on output prolongator ',omp(1:min(size(omp),10)) if (debug_level >= psb_debug_outer_) & & write(debug_unit,*) me,' ',trim(name),& & 'Done NUMBMM 1' - + ! Compute omega_int ommx = -1d300 do i=1, ncol @@ -323,7 +311,7 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) end do omf(i) = max(dzero,omf(i)) end do - + if (filter_mat) then ! @@ -364,10 +352,6 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) end if omf(1:nrow) = omf(1:nrow) * adinv(1:nrow) -!!$ if (filter_mat) call psb_sp_scal(adinv,af,info) -!!$ -!!$ call psb_sp_scal(adinv,am3,info) -!!$ if (info /= 0) goto 9999 if (filter_mat) then ! @@ -454,32 +438,145 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) ! ! Ok, let's start over with the restrictor ! - call psb_transp(ptilde,rtilde,fmt='CSR') + if (.false.) then + i = 4 + select case (i) + case(1) + + call psb_transp(ptilde,rtilde,fmt='CSR') + call psb_spcnv(a,atmp,info,afmt='CSR') + + am4%fida='COO' + am4%m=ncol-nrow + am4%k=ncol + call psb_sp_all(ncol,ntaggr,am4,ncol,info) + + do i=1,ncol-nrow + am4%aspk(i) = dzero + am4%ia1(i) = i + am4%ia2(i) = nrow+i + end do + call psb_sp_setifld(nrow-ncol,psb_nnz_,am4,info) + call psb_spcnv(am4,info,afmt='CSR') + if (info == 0) call psb_rwextd(ncol,atmp,info,b=am4) + if (info == 0) call psb_sp_free(am4,info) + + case(2) + + call psb_transp(ptilde,rtilde,fmt='CSR') + call psb_spcnv(a,atmp,info,afmt='CSR') + call psb_sphalo(atmp,desc_a,am4,info,& + & colcnv=.true.,rowscale=.true.) + nrt = psb_sp_get_nrows(am4) + call psb_sp_clip(am4,atmp2,info,1,nrt,1,ncol) + call psb_spcnv(atmp2,info,afmt='CSR') + atmp2%aspk(:) = dzero + if (info == 0) call psb_rwextd(ncol,atmp,info,b=atmp2) + if (info == 0) call psb_sp_free(am4,info) + if (info == 0) call psb_sp_free(atmp2,info) + + case (3) + + ! We are doing the product only on the local + ! rows, the non-local contributions will be handled + ! through the global sum. + call psb_transp(ptilde,am4,fmt='CSR') + nrt = psb_sp_get_nrows(am4) + call psb_sp_clip(am4,rtilde,info,1,nrt,1,nrow) + call psb_spcnv(a,atmp,info,afmt='CSR') + + case(4) + + call psb_transp(ptilde,rtilde,fmt='COO') + do i=1, psb_sp_get_nnzeros(rtilde) + if (rtilde%ia2(i) > nrow) then + rtilde%aspk(i) = dzero + end if + end do + call psb_spcnv(rtilde,info,afmt='CSR') + call psb_spcnv(a,atmp,info,afmt='CSR') + call psb_sphalo(atmp,desc_a,am4,info,& + & colcnv=.true.,rowscale=.true.) + nrt = psb_sp_get_nrows(am4) + call psb_sp_clip(am4,atmp2,info,1,nrt,1,ncol) + call psb_spcnv(atmp2,info,afmt='CSR') +!!$ atmp2%aspk(:) = dzero + if (info == 0) call psb_rwextd(ncol,atmp,info,b=atmp2) + if (info == 0) call psb_sp_free(am4,info) + if (info == 0) call psb_sp_free(atmp2,info) + + case default + write(0,*) 'Not building rtilde/atmp, this will blow up' + info = 4010 + goto 9999 + end select + + if (info == 0) call psb_symbmm(rtilde,atmp,ra,info) + if (info == 0) call psb_numbmm(rtilde,atmp,ra) + if (info /= 0) then + write(0,*) 'From symbmm 1:',info + goto 9999 + end if + call psb_sp_scal(adinv,atmp,info) + if (info == 0) call psb_symbmm(ra,atmp,rada,info) + if (info == 0) call psb_numbmm(ra,atmp,rada) + if (info /= 0) then + write(0,*) 'From symbmm 2:',info + goto 9999 + end if - call psb_spcnv(a,atmp,info,afmt='CSR') - call psb_sphalo(atmp,desc_a,am4,info,& - & colcnv=.true.,rowscale=.true.) - nrt = psb_sp_get_nrows(am4) - call psb_sp_clip(am4,atmp2,info,1,nrt,1,ncol) - call psb_spcnv(atmp2,info,afmt='CSR') - if (info == 0) call psb_rwextd(ncol,atmp,info,b=atmp2) - if (info == 0) call psb_sp_free(am4,info) - if (info == 0) call psb_sp_free(atmp2,info) - call psb_symbmm(rtilde,atmp,ra,info) - if (info == 0) call psb_numbmm(rtilde,atmp,ra) - if (info /= 0) then - write(0,*) 'From symbmm :',info - goto 9999 - end if - call psb_sp_scal(adinv,atmp,info) - call psb_symbmm(ra,atmp,rada,info) - call psb_numbmm(ra,atmp,rada) + call csr_mat_row_prod(ra,rada,omp,info) + call csr_mat_row_prod(rada,rada,oden,info) + call psb_sum(ictxt,omp) + call psb_sum(ictxt,oden) + else + + call psb_transp(ptilde,rtilde,fmt='CSR') + call psb_spcnv(a,atmp,info,afmt='CSR') + call psb_sphalo(atmp,desc_a,am4,info,& + & colcnv=.true.,rowscale=.true.) + nrt = psb_sp_get_nrows(am4) + call psb_sp_clip(am4,atmp2,info,1,nrt,1,ncol) + call psb_spcnv(atmp2,info,afmt='CSR') + if (info == 0) call psb_rwextd(ncol,atmp,info,b=atmp2) + if (info == 0) call psb_sp_free(am4,info) + if (info == 0) call psb_sp_free(atmp2,info) + ! This is to compute the transpose. It ONLY works if the + ! original A has a symmetric pattern. + call psb_transp(atmp,atmp2) + call psb_sp_clip(atmp2,atran,info,1,nrow,1,ncol) + call psb_sp_free(atmp2,info) + ! Now for the product. + call psb_symbmm(atran,ptilde,atp,info) + if (info == 0) call psb_numbmm(atran,ptilde,atp) + call psb_sp_clone(atp,atmp2,info) + call psb_sp_scal(adinv,atmp2,info) + call psb_sphalo(atmp2,desc_a,am4,info,& + & colcnv=.false.,rowscale=.true.,outfmt='CSR ') + if (info == 0) call psb_rwextd(ncol,atmp2,info,b=am4) + if (info == 0) call psb_sp_free(am4,info) + + call psb_symbmm(atran,atmp2,atdatp,info) + call psb_numbmm(atran,atmp2,atdatp) + call psb_sp_free(atmp2,info) + + call psb_spcnv(atp,info,afmt='coo') + if (info == 0) call psb_spcnv(atp,info,afmt='csc') + if (info == 0) call psb_spcnv(atdatp,info,afmt='coo') + if (info == 0) call psb_spcnv(atdatp,info,afmt='csc') + if (info /= 0) then + write(0,*) 'Failed conversion to CSC' + end if + + call csc_mat_col_prod(atp,atdatp,omp,info) + call csc_mat_col_prod(atdatp,atdatp,oden,info) + call psb_sum(ictxt,omp) + call psb_sum(ictxt,oden) + - call csr_mat_row_prod(ra,rada,omp,info) - call csr_mat_row_prod(rada,rada,oden,info) - call psb_sum(ictxt,omp) - call psb_sum(ictxt,oden) - + end if +!!$ write(debug_unit,*) trim(name),' OMP_R :',omp +!!$ write(debug_unit,*) trim(name),' ODEN_R:',oden omp = omp/oden !!$ write(0,*) 'Check on output restrictor',omp(1:min(size(omp),10)) ! Compute omega_int @@ -503,7 +600,7 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) end do omf(1:ncol) = omf(1:ncol)*adinv(1:ncol) call psb_sp_free(atmp2,info) - + if (psb_toupper(atmp%fida)=='CSR') then do i=1,atmp%m @@ -598,7 +695,7 @@ subroutine mld_daggrmat_minnrg_asb(a,desc_a,ilaggr,nlaggr,p,info) call psb_errpush(4001,name,a_err='Build b = am2 x am3') goto 9999 end if - + select case(p%iprcparm(mld_coarse_mat_)) @@ -769,13 +866,13 @@ contains end if csca = (psb_toupper(a%fida(1:3))=='CSC') cscb = (psb_toupper(b%fida(1:3))=='CSC') - + if (.not.(csca.and.cscb)) then write(0,*) 'Matrices A and B should be in CSC' info = -2 return end if - + do j=1, nc iap = a%ia2(j) nra = a%ia2(j+1)-iap @@ -805,13 +902,13 @@ contains end if csra = (psb_toupper(a%fida(1:3))=='CSR') csrb = (psb_toupper(b%fida(1:3))=='CSR') - + if (.not.(csra.and.csrb)) then write(0,*) 'Matrices A and B should be in CSR' info = -2 return end if - + do j=1, nr iap = a%ia2(j) nca = a%ia2(j+1)-iap @@ -822,7 +919,7 @@ contains end do end subroutine csr_mat_row_prod - + function sparse_srtd_dot(nv1,iv1,v1,nv2,iv2,v2) result(dot) integer, intent(in) :: nv1,nv2 integer, intent(in) :: iv1(:), iv2(:) @@ -830,7 +927,7 @@ contains real(psb_dpk_) :: dot integer :: i,j,k, ip1, ip2 - + dot = dzero ip1 = 1 ip2 = 1 @@ -848,7 +945,7 @@ contains ip2 = ip2 + 1 end if end do - + end function sparse_srtd_dot end subroutine mld_daggrmat_minnrg_asb