Better allocation of temporaries in SPHALO.

merge-paraggr
Salvatore Filippone 6 years ago
parent b2904e6809
commit a9f4470d34

@ -80,7 +80,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
& n_el_send,k,n_el_recv,idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,&
& irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,&
& l1, err_act, nsnds, nrcvs, nrg, ncg
& l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -181,13 +181,11 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(izero,a%get_ncols())
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract the row size
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -201,7 +199,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
@ -240,13 +238,17 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr = sum(rvsz)
mat_recv = iszr
iszs = sum(sdsz)
lnnz = max(iszr,iszs,ione)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -266,7 +268,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=ipdxv(counter)
@ -352,7 +354,6 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
! This is necessary when sphalo is used to compute a transpose,
! as opposed to just gathering halo for spspmm purposes.
!
ncg = huge(ncg)
tot_elem = 0
Do
proc = ipdxv(counter)
@ -365,7 +366,7 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = ipdxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem,jmax=ncg)
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
call psb_errpush(info,name,a_err='psb_sp_getrow')
@ -505,9 +506,9 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
integer(psb_ipk_) :: counter, proc, i, &
& n_el_send,n_el_recv,&
& n_elem, j, ipx,mat_recv, idxs,idxr,nz,&
& data_,totxch,nxs, nxr
& data_,totxch,nxs, nxr, ncg
integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, &
& lidx, l1, lnr, lnc, idx, ngtz, tot_elem
& lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -602,13 +603,11 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
lnc = a%get_ncols()
call acoo%allocate(lzero,lnc)
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract and send/receive.
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -622,10 +621,16 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
!
! Exchange row sizes, so as to know sends/receives.
! This is different from the halo exchange because the
! size of the rows may vary, as opposed to fixed
! (multi) vector row size.
!
call mpi_alltoall(sdsz,1,psb_mpi_mpk_,&
& rvsz,1,psb_mpi_mpk_,icomm,minfo)
if (info /= psb_success_) then
@ -651,13 +656,17 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr=sum(1_psb_lpk_*rvsz)
mat_recv = iszr
iszs=sum(1_psb_lpk_*sdsz)
lnnz = max(iszr,iszs,lone)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -667,16 +676,12 @@ Subroutine psb_lcsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
goto 9999
end if
if (info /= psb_success_) then
info=psb_err_from_subroutine_; ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err); goto 9999
end if
l1 = 0
ipx = 1
counter=1
idx = 0
ncg = huge(ncg)
tot_elem=0
Do
proc=ipdxv(counter)

@ -80,7 +80,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
& n_el_send,k,n_el_recv,idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,&
& irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,&
& l1, err_act, nsnds, nrcvs, nrg, ncg
& l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -181,13 +181,11 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(izero,a%get_ncols())
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract the row size
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -201,7 +199,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
@ -240,13 +238,17 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr = sum(rvsz)
mat_recv = iszr
iszs = sum(sdsz)
lnnz = max(iszr,iszs,ione)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -266,7 +268,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=ipdxv(counter)
@ -352,7 +354,6 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
! This is necessary when sphalo is used to compute a transpose,
! as opposed to just gathering halo for spspmm purposes.
!
ncg = huge(ncg)
tot_elem = 0
Do
proc = ipdxv(counter)
@ -365,7 +366,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = ipdxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem,jmax=ncg)
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
call psb_errpush(info,name,a_err='psb_sp_getrow')
@ -505,9 +506,9 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
integer(psb_ipk_) :: counter, proc, i, &
& n_el_send,n_el_recv,&
& n_elem, j, ipx,mat_recv, idxs,idxr,nz,&
& data_,totxch,nxs, nxr
& data_,totxch,nxs, nxr, ncg
integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, &
& lidx, l1, lnr, lnc, idx, ngtz, tot_elem
& lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -602,13 +603,11 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
lnc = a%get_ncols()
call acoo%allocate(lzero,lnc)
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract and send/receive.
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -622,10 +621,16 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
!
! Exchange row sizes, so as to know sends/receives.
! This is different from the halo exchange because the
! size of the rows may vary, as opposed to fixed
! (multi) vector row size.
!
call mpi_alltoall(sdsz,1,psb_mpi_mpk_,&
& rvsz,1,psb_mpi_mpk_,icomm,minfo)
if (info /= psb_success_) then
@ -651,13 +656,17 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr=sum(1_psb_lpk_*rvsz)
mat_recv = iszr
iszs=sum(1_psb_lpk_*sdsz)
lnnz = max(iszr,iszs,lone)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -667,16 +676,12 @@ Subroutine psb_ldsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
goto 9999
end if
if (info /= psb_success_) then
info=psb_err_from_subroutine_; ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err); goto 9999
end if
l1 = 0
ipx = 1
counter=1
idx = 0
ncg = huge(ncg)
tot_elem=0
Do
proc=ipdxv(counter)

@ -80,7 +80,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
& n_el_send,k,n_el_recv,idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,&
& irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,&
& l1, err_act, nsnds, nrcvs, nrg, ncg
& l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -181,13 +181,11 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(izero,a%get_ncols())
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract the row size
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -201,7 +199,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
@ -240,13 +238,17 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr = sum(rvsz)
mat_recv = iszr
iszs = sum(sdsz)
lnnz = max(iszr,iszs,ione)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -266,7 +268,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=ipdxv(counter)
@ -352,7 +354,6 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
! This is necessary when sphalo is used to compute a transpose,
! as opposed to just gathering halo for spspmm purposes.
!
ncg = huge(ncg)
tot_elem = 0
Do
proc = ipdxv(counter)
@ -365,7 +366,7 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = ipdxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem,jmax=ncg)
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
call psb_errpush(info,name,a_err='psb_sp_getrow')
@ -505,9 +506,9 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
integer(psb_ipk_) :: counter, proc, i, &
& n_el_send,n_el_recv,&
& n_elem, j, ipx,mat_recv, idxs,idxr,nz,&
& data_,totxch,nxs, nxr
& data_,totxch,nxs, nxr, ncg
integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, &
& lidx, l1, lnr, lnc, idx, ngtz, tot_elem
& lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -602,13 +603,11 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
lnc = a%get_ncols()
call acoo%allocate(lzero,lnc)
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract and send/receive.
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -622,10 +621,16 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
!
! Exchange row sizes, so as to know sends/receives.
! This is different from the halo exchange because the
! size of the rows may vary, as opposed to fixed
! (multi) vector row size.
!
call mpi_alltoall(sdsz,1,psb_mpi_mpk_,&
& rvsz,1,psb_mpi_mpk_,icomm,minfo)
if (info /= psb_success_) then
@ -651,13 +656,17 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr=sum(1_psb_lpk_*rvsz)
mat_recv = iszr
iszs=sum(1_psb_lpk_*sdsz)
lnnz = max(iszr,iszs,lone)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -667,16 +676,12 @@ Subroutine psb_lssphalo(a,desc_a,blk,info,rowcnv,colcnv,&
goto 9999
end if
if (info /= psb_success_) then
info=psb_err_from_subroutine_; ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err); goto 9999
end if
l1 = 0
ipx = 1
counter=1
idx = 0
ncg = huge(ncg)
tot_elem=0
Do
proc=ipdxv(counter)

@ -80,7 +80,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
& n_el_send,k,n_el_recv,idx, r, tot_elem,&
& n_elem, j, ipx,mat_recv, iszs, iszr,idxs,idxr,nz,&
& irmin,icmin,irmax,icmax,data_,ngtz,totxch,nxs, nxr,&
& l1, err_act, nsnds, nrcvs, nrg, ncg
& l1, err_act, nsnds, nrcvs, lnr, lnc, lnnz
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -181,13 +181,11 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
call acoo%allocate(izero,a%get_ncols())
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract the row size
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -201,7 +199,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
@ -240,13 +238,17 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr = sum(rvsz)
mat_recv = iszr
iszs = sum(sdsz)
lnnz = max(iszr,iszs,ione)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -266,7 +268,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
ipx = 1
counter=1
idx = 0
tot_elem=0
Do
proc=ipdxv(counter)
@ -352,7 +354,6 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
! This is necessary when sphalo is used to compute a transpose,
! as opposed to just gathering halo for spspmm purposes.
!
ncg = huge(ncg)
tot_elem = 0
Do
proc = ipdxv(counter)
@ -365,7 +366,7 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = ipdxv(counter+psb_elem_send_+j)
n_elem = a%get_nz_row(idx)
call a%csget(idx,idx,ngtz,iasnd,jasnd,valsnd,info,&
& append=.true.,nzin=tot_elem,jmax=ncg)
& append=.true.,nzin=tot_elem)
if (info /= psb_success_) then
info=psb_err_from_subroutine_
call psb_errpush(info,name,a_err='psb_sp_getrow')
@ -505,9 +506,9 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
integer(psb_ipk_) :: counter, proc, i, &
& n_el_send,n_el_recv,&
& n_elem, j, ipx,mat_recv, idxs,idxr,nz,&
& data_,totxch,nxs, nxr
& data_,totxch,nxs, nxr, ncg
integer(psb_lpk_) :: r, k, irmin, irmax, icmin, icmax, iszs, iszr, &
& lidx, l1, lnr, lnc, idx, ngtz, tot_elem
& lidx, l1, lnr, lnc, lnnz, idx, ngtz, tot_elem
integer(psb_mpk_) :: icomm, minfo
integer(psb_mpk_), allocatable :: brvindx(:), &
& rvsz(:), bsdindx(:),sdsz(:)
@ -602,13 +603,11 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
idx = 0
idxs = 0
idxr = 0
lnc = a%get_ncols()
call acoo%allocate(lzero,lnc)
call desc_a%get_list(data_,pdxv,totxch,nxr,nxs,info)
ipdxv = pdxv%get_vect()
! For all rows in the halo descriptor, extract and send/receive.
lnr = 0
Do
proc=ipdxv(counter)
if (proc == -1) exit
@ -622,10 +621,16 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
tot_elem = tot_elem+n_elem
Enddo
sdsz(proc+1) = tot_elem
call acoo%set_nrows(acoo%get_nrows() + n_el_recv)
lnr = lnr + n_el_recv
counter = counter+n_el_send+3
Enddo
!
! Exchange row sizes, so as to know sends/receives.
! This is different from the halo exchange because the
! size of the rows may vary, as opposed to fixed
! (multi) vector row size.
!
call mpi_alltoall(sdsz,1,psb_mpi_mpk_,&
& rvsz,1,psb_mpi_mpk_,icomm,minfo)
if (info /= psb_success_) then
@ -651,13 +656,17 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
counter = counter+n_el_send+3
Enddo
iszr=sum(rvsz)
call acoo%reallocate(max(iszr,1))
iszr=sum(1_psb_lpk_*rvsz)
mat_recv = iszr
iszs=sum(1_psb_lpk_*sdsz)
lnnz = max(iszr,iszs,lone)
lnc = a%get_ncols()
call acoo%allocate(lnr,lnc,lnnz)
if (debug_level >= psb_debug_outer_)&
& write(debug_unit,*) me,' ',trim(name),': Sizes:',acoo%get_size(),&
& ' Send:',sdsz(:),' Receive:',rvsz(:)
mat_recv = iszr
iszs=sum(sdsz)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),iasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),jasnd,info)
if (info == psb_success_) call psb_ensure_size(max(iszs,1),valsnd,info)
@ -667,16 +676,12 @@ Subroutine psb_lzsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
goto 9999
end if
if (info /= psb_success_) then
info=psb_err_from_subroutine_; ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err); goto 9999
end if
l1 = 0
ipx = 1
counter=1
idx = 0
ncg = huge(ncg)
tot_elem=0
Do
proc=ipdxv(counter)

Loading…
Cancel
Save