From a9f4470d3484656a49dd0f2d312b3970740f057b Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Wed, 1 May 2019 15:07:58 +0100 Subject: [PATCH] Better allocation of temporaries in SPHALO. --- base/tools/psb_csphalo.F90 | 61 +++++++++++++++++++++----------------- base/tools/psb_dsphalo.F90 | 61 +++++++++++++++++++++----------------- base/tools/psb_ssphalo.F90 | 61 +++++++++++++++++++++----------------- base/tools/psb_zsphalo.F90 | 61 +++++++++++++++++++++----------------- 4 files changed, 132 insertions(+), 112 deletions(-) diff --git a/base/tools/psb_csphalo.F90 b/base/tools/psb_csphalo.F90 index efa42fd1..854d65f4 100644 --- a/base/tools/psb_csphalo.F90 +++ b/base/tools/psb_csphalo.F90 @@ -80,7 +80,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& & n_el_send,k,n_el_recv,idx, r, tot_elem,& & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& - & l1, err_act, nsnds, nrcvs, nrg, ncg + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -181,13 +181,11 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - - call acoo%allocate(izero,a%get_ncols()) - - + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract the row size + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -201,7 +199,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo @@ -240,13 +238,17 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -266,7 +268,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& ipx = 1 counter=1 idx = 0 - + tot_elem=0 Do proc=ipdxv(counter) @@ -352,7 +354,6 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& ! This is necessary when sphalo is used to compute a transpose, ! as opposed to just gathering halo for spspmm purposes. ! - ncg = huge(ncg) tot_elem = 0 Do proc = ipdxv(counter) @@ -365,7 +366,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = ipdxv(counter+psb_elem_send_+j) n_elem = a%get_nz_row(idx) call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& - & append=.true.,nzin=tot_elem,jmax=ncg) + & append=.true.,nzin=tot_elem) if (info /= psb_success_) then info=psb_err_from_subroutine_ call psb_errpush(info,name,a_err='psb_sp_getrow') @@ -505,9 +506,9 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& integer(psb_ipk_) :: counter, proc, i, & & n_el_send,n_el_recv,& & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& - & data_,totxch,nxs, nxr + & data_,totxch,nxs, nxr, ncg integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & - & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -602,13 +603,11 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - lnc = a%get_ncols() - call acoo%allocate(lzero,lnc) - call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract and send/receive. + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -622,10 +621,16 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& & rvsz,1,psb_mpi_mpk_,icomm,minfo) if (info /= psb_success_) then @@ -651,13 +656,17 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr=sum(1_psb_lpk_*rvsz) + mat_recv = iszr + iszs=sum(1_psb_lpk_*sdsz) + + lnnz = max(iszr,iszs,lone) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -667,16 +676,12 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,& goto 9999 end if - if (info /= psb_success_) then - info=psb_err_from_subroutine_; ch_err='psb_sp_reall' - call psb_errpush(info,name,a_err=ch_err); goto 9999 - end if - l1 = 0 ipx = 1 counter=1 idx = 0 + ncg = huge(ncg) tot_elem=0 Do proc=ipdxv(counter) diff --git a/base/tools/psb_dsphalo.F90 b/base/tools/psb_dsphalo.F90 index e050a834..94f512c7 100644 --- a/base/tools/psb_dsphalo.F90 +++ b/base/tools/psb_dsphalo.F90 @@ -80,7 +80,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& & n_el_send,k,n_el_recv,idx, r, tot_elem,& & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& - & l1, err_act, nsnds, nrcvs, nrg, ncg + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -181,13 +181,11 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - - call acoo%allocate(izero,a%get_ncols()) - - + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract the row size + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -201,7 +199,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo @@ -240,13 +238,17 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -266,7 +268,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& ipx = 1 counter=1 idx = 0 - + tot_elem=0 Do proc=ipdxv(counter) @@ -352,7 +354,6 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& ! This is necessary when sphalo is used to compute a transpose, ! as opposed to just gathering halo for spspmm purposes. ! - ncg = huge(ncg) tot_elem = 0 Do proc = ipdxv(counter) @@ -365,7 +366,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = ipdxv(counter+psb_elem_send_+j) n_elem = a%get_nz_row(idx) call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& - & append=.true.,nzin=tot_elem,jmax=ncg) + & append=.true.,nzin=tot_elem) if (info /= psb_success_) then info=psb_err_from_subroutine_ call psb_errpush(info,name,a_err='psb_sp_getrow') @@ -505,9 +506,9 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& integer(psb_ipk_) :: counter, proc, i, & & n_el_send,n_el_recv,& & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& - & data_,totxch,nxs, nxr + & data_,totxch,nxs, nxr, ncg integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & - & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -602,13 +603,11 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - lnc = a%get_ncols() - call acoo%allocate(lzero,lnc) - call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract and send/receive. + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -622,10 +621,16 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& & rvsz,1,psb_mpi_mpk_,icomm,minfo) if (info /= psb_success_) then @@ -651,13 +656,17 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr=sum(1_psb_lpk_*rvsz) + mat_recv = iszr + iszs=sum(1_psb_lpk_*sdsz) + + lnnz = max(iszr,iszs,lone) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -667,16 +676,12 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,& goto 9999 end if - if (info /= psb_success_) then - info=psb_err_from_subroutine_; ch_err='psb_sp_reall' - call psb_errpush(info,name,a_err=ch_err); goto 9999 - end if - l1 = 0 ipx = 1 counter=1 idx = 0 + ncg = huge(ncg) tot_elem=0 Do proc=ipdxv(counter) diff --git a/base/tools/psb_ssphalo.F90 b/base/tools/psb_ssphalo.F90 index 0be93d24..fa200bb0 100644 --- a/base/tools/psb_ssphalo.F90 +++ b/base/tools/psb_ssphalo.F90 @@ -80,7 +80,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& & n_el_send,k,n_el_recv,idx, r, tot_elem,& & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& - & l1, err_act, nsnds, nrcvs, nrg, ncg + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -181,13 +181,11 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - - call acoo%allocate(izero,a%get_ncols()) - - + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract the row size + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -201,7 +199,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo @@ -240,13 +238,17 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -266,7 +268,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& ipx = 1 counter=1 idx = 0 - + tot_elem=0 Do proc=ipdxv(counter) @@ -352,7 +354,6 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& ! This is necessary when sphalo is used to compute a transpose, ! as opposed to just gathering halo for spspmm purposes. ! - ncg = huge(ncg) tot_elem = 0 Do proc = ipdxv(counter) @@ -365,7 +366,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = ipdxv(counter+psb_elem_send_+j) n_elem = a%get_nz_row(idx) call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& - & append=.true.,nzin=tot_elem,jmax=ncg) + & append=.true.,nzin=tot_elem) if (info /= psb_success_) then info=psb_err_from_subroutine_ call psb_errpush(info,name,a_err='psb_sp_getrow') @@ -505,9 +506,9 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& integer(psb_ipk_) :: counter, proc, i, & & n_el_send,n_el_recv,& & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& - & data_,totxch,nxs, nxr + & data_,totxch,nxs, nxr, ncg integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & - & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -602,13 +603,11 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - lnc = a%get_ncols() - call acoo%allocate(lzero,lnc) - call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract and send/receive. + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -622,10 +621,16 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& & rvsz,1,psb_mpi_mpk_,icomm,minfo) if (info /= psb_success_) then @@ -651,13 +656,17 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr=sum(1_psb_lpk_*rvsz) + mat_recv = iszr + iszs=sum(1_psb_lpk_*sdsz) + + lnnz = max(iszr,iszs,lone) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -667,16 +676,12 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,& goto 9999 end if - if (info /= psb_success_) then - info=psb_err_from_subroutine_; ch_err='psb_sp_reall' - call psb_errpush(info,name,a_err=ch_err); goto 9999 - end if - l1 = 0 ipx = 1 counter=1 idx = 0 + ncg = huge(ncg) tot_elem=0 Do proc=ipdxv(counter) diff --git a/base/tools/psb_zsphalo.F90 b/base/tools/psb_zsphalo.F90 index 1d516ec3..ced01ba4 100644 --- a/base/tools/psb_zsphalo.F90 +++ b/base/tools/psb_zsphalo.F90 @@ -80,7 +80,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& & n_el_send,k,n_el_recv,idx, r, tot_elem,& & n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,& & irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,& - & l1, err_act, nsnds, nrcvs, nrg, ncg + & l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -181,13 +181,11 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - - call acoo%allocate(izero,a%get_ncols()) - - + call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract the row size + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -201,7 +199,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo @@ -240,13 +238,17 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr = sum(rvsz) + mat_recv = iszr + iszs = sum(sdsz) + + lnnz = max(iszr,iszs,ione) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -266,7 +268,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& ipx = 1 counter=1 idx = 0 - + tot_elem=0 Do proc=ipdxv(counter) @@ -352,7 +354,6 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& ! This is necessary when sphalo is used to compute a transpose, ! as opposed to just gathering halo for spspmm purposes. ! - ncg = huge(ncg) tot_elem = 0 Do proc = ipdxv(counter) @@ -365,7 +366,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = ipdxv(counter+psb_elem_send_+j) n_elem = a%get_nz_row(idx) call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,& - & append=.true.,nzin=tot_elem,jmax=ncg) + & append=.true.,nzin=tot_elem) if (info /= psb_success_) then info=psb_err_from_subroutine_ call psb_errpush(info,name,a_err='psb_sp_getrow') @@ -505,9 +506,9 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& integer(psb_ipk_) :: counter, proc, i, & & n_el_send,n_el_recv,& & n_elem, j, ipx,mat_recv, idxs,idxr,nz,& - & data_,totxch,nxs, nxr + & data_,totxch,nxs, nxr, ncg integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, & - & lidx, l1, lnr, lnc, idx, ngtz, tot_elem + & lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem integer(psb_mpk_) :: icomm, minfo integer(psb_mpk_), allocatable :: brvindx(:), & & rvsz(:), bsdindx(:),sdsz(:) @@ -602,13 +603,11 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& idx = 0 idxs = 0 idxr = 0 - lnc = a%get_ncols() - call acoo%allocate(lzero,lnc) - call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info) ipdxv = pdxv%get_vect() ! For all rows in the halo descriptor, extract and send/receive. + lnr = 0 Do proc=ipdxv(counter) if (proc == -1) exit @@ -622,10 +621,16 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& tot_elem = tot_elem+n_elem Enddo sdsz(proc+1) = tot_elem - call acoo%set_nrows(acoo%get_nrows() + n_el_recv) + lnr = lnr + n_el_recv counter = counter+n_el_send+3 Enddo + ! + ! Exchange row sizes, so as to know sends/receives. + ! This is different from the halo exchange because the + ! size of the rows may vary, as opposed to fixed + ! (multi) vector row size. + ! call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& & rvsz,1,psb_mpi_mpk_,icomm,minfo) if (info /= psb_success_) then @@ -651,13 +656,17 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& counter = counter+n_el_send+3 Enddo - iszr=sum(rvsz) - call acoo%reallocate(max(iszr,1)) + iszr=sum(1_psb_lpk_*rvsz) + mat_recv = iszr + iszs=sum(1_psb_lpk_*sdsz) + + lnnz = max(iszr,iszs,lone) + lnc = a%get_ncols() + call acoo%allocate(lnr,lnc,lnnz) if (debug_level >= psb_debug_outer_)& & write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),& & ' Send:',sdsz(:),' Receive:',rvsz(:) - mat_recv = iszr - iszs=sum(sdsz) + if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info) if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info) @@ -667,16 +676,12 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,& goto 9999 end if - if (info /= psb_success_) then - info=psb_err_from_subroutine_; ch_err='psb_sp_reall' - call psb_errpush(info,name,a_err=ch_err); goto 9999 - end if - l1 = 0 ipx = 1 counter=1 idx = 0 + ncg = huge(ncg) tot_elem=0 Do proc=ipdxv(counter)