@ -70,7 +70,7 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep
integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k
logical :: keepnum_, keeploc_
integer(psb_mpk_) :: ictxt,np,me
integer(psb_mpk_) :: icomm, minfo, ndx
integer(psb_mpk_) :: icomm, minfo, ndx, root_
integer(psb_mpk_), allocatable :: nzbr(:), idisp(:)
integer(psb_lpk_), allocatable :: locia(:), locja(:), glbia(:), glbja(:)
integer(psb_ipk_) :: ierr(5)
@ -98,7 +98,12 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep
keeploc_ = .true.
end if
call globa%free()
if (present(root)) then
root_ = root
root_ = -1
end if
if ((root_ == -1).or.(root_ == me)) call globa%free()
if (keepnum_) then
nrg = desc_a%get_global_rows()
@ -137,27 +142,48 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep
call psb_errpush(info,name); goto 9999
end if
if ((root_ == -1).or.(root_ == me)) then
if (info == psb_success_) call psb_realloc(nzg,glbia,info)
if (info == psb_success_) call psb_realloc(nzg,glbja,info)
if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg)
if (info == psb_success_) call psb_realloc(1,glbia,info)
if (info == psb_success_) call psb_realloc(1,glbja,info)
if (info == psb_success_) call glob_coo%allocate(1,1,1)
end if
if (info == psb_success_) call psb_realloc(nzg,glbia,info)
if (info == psb_success_) call psb_realloc(nzg,glbja,info)
if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg)
if (info /= psb_success_) goto 9999
do ip=1,np
idisp(ip) = sum(nzbr(1:ip-1))
ndx = nzbr(me+1)
call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(locia,ndx,psb_mpi_lpk_,&
& glbia,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(locja,ndx,psb_mpi_lpk_,&
& glbja,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (root_ == -1) then
call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(locia,ndx,psb_mpi_lpk_,&
& glbia,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(locja,ndx,psb_mpi_lpk_,&
& glbja,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,root_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_gatherv(locia,ndx,psb_mpi_lpk_,&
& glbia,nzbr,idisp,&
& psb_mpi_lpk_,root_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_gatherv(locja,ndx,psb_mpi_lpk_,&
& glbja,nzbr,idisp,&
& psb_mpi_lpk_,root_,icomm,minfo)
end if
if (minfo /= psb_success_) then
info = minfo
@ -172,11 +198,13 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep
! for very large cases it does not make sense to
! gather the matrix on a single procecss anyway...
glob_coo%ia(1:nzg) = glbia(1:nzg)
glob_coo%ja(1:nzg) = glbja(1:nzg)
call glob_coo%set_nzeros(nzg)
if (present(dupl)) call glob_coo%set_dupl(dupl)
call globa%mv_from(glob_coo)
if ((root_ == -1).or.(root_ == me)) then
glob_coo%ia(1:nzg) = glbia(1:nzg)
glob_coo%ja(1:nzg) = glbja(1:nzg)
call glob_coo%set_nzeros(nzg)
if (present(dupl)) call glob_coo%set_dupl(dupl)
call globa%mv_from(glob_coo)
end if
deallocate(glbia,glbja, stat=info)
@ -228,7 +256,7 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee
integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k, nzl
logical :: keepnum_, keeploc_
integer(psb_mpk_) :: ictxt,np,me
integer(psb_mpk_) :: icomm, minfo, ndx
integer(psb_mpk_) :: icomm, minfo, ndx, root_
integer(psb_mpk_), allocatable :: nzbr(:), idisp(:)
integer(psb_lpk_), allocatable :: lnzbr(:)
integer(psb_ipk_) :: ierr(5)
@ -256,7 +284,13 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee
keeploc_ = .true.
end if
call globa%free()
if (present(root)) then
root_ = root
root_ = -1
end if
if ((root_ == -1).or.(root_ == me)) call globa%free()
if (keepnum_) then
nrg = desc_a%get_global_rows()
@ -294,7 +328,11 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee
call psb_errpush(info,name); goto 9999
end if
if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg)
if ((root_ == -1).or.(root_ == me)) then
if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg)
if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_)
end if
if (info /= psb_success_) goto 9999
@ -304,18 +342,31 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee
idisp(ip) = sum(nzbr(1:ip-1))
ndx = nzbr(me+1)
call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,&
& glob_coo%ia,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,&
& glob_coo%ja,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (root_ == -1) then
call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,&
& glob_coo%ia,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,&
& glob_coo%ja,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,root_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,&
& glob_coo%ia,nzbr,idisp,&
& psb_mpi_lpk_,root_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,&
& glob_coo%ja,nzbr,idisp,&
& psb_mpi_lpk_,root_,icomm,minfo)
end if
if (minfo /= psb_success_) then
info = minfo
call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv')
@ -328,9 +379,11 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee
! for very large cases it does not make sense to
! gather the matrix on a single procecss anyway...
call glob_coo%set_nzeros(nzg)
if (present(dupl)) call glob_coo%set_dupl(dupl)
call globa%mv_from(glob_coo)
if ((root_ == -1).or.(root_ == me)) then
call glob_coo%set_nzeros(nzg)
if (present(dupl)) call glob_coo%set_dupl(dupl)
call globa%mv_from(glob_coo)
end if
write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_
@ -378,7 +431,7 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k
integer(psb_lpk_) :: ip,naggrm1,naggrp1, i, j, k, nzl
logical :: keepnum_, keeploc_
integer(psb_mpk_) :: ictxt,np,me
integer(psb_mpk_) :: icomm, minfo, ndx
integer(psb_mpk_) :: icomm, minfo, ndx, root_
integer(psb_mpk_), allocatable :: nzbr(:), idisp(:)
integer(psb_lpk_), allocatable :: lnzbr(:)
integer(psb_ipk_) :: ierr(5)
@ -406,7 +459,13 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k
keeploc_ = .true.
end if
call globa%free()
if (present(root)) then
root_ = root
root_ = -1
end if
if ((root_ == -1).or.(root_ == me)) call globa%free()
if (keepnum_) then
nrg = desc_a%get_global_rows()
@ -444,25 +503,43 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k
call psb_errpush(info,name); goto 9999
end if
if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg)
if ((root_ == -1).or.(root_ == me)) then
if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg)
if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_)
end if
if (info /= psb_success_) goto 9999
do ip=1,np
idisp(ip) = sum(nzbr(1:ip-1))
ndx = nzbr(me+1)
call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,&
& glob_coo%ia,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,&
& glob_coo%ja,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (root_ == -1) then
call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,&
& glob_coo%ia,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,&
& glob_coo%ja,nzbr,idisp,&
& psb_mpi_lpk_,icomm,minfo)
call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_spk_,&
& glob_coo%val,nzbr,idisp,&
& psb_mpi_c_spk_,root_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,&
& glob_coo%ia,nzbr,idisp,&
& psb_mpi_lpk_,root_,icomm,minfo)
if (minfo == psb_success_) call &
& mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,&
& glob_coo%ja,nzbr,idisp,&
& psb_mpi_lpk_,root_,icomm,minfo)
end if
if (minfo /= psb_success_) then
info = minfo
call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv')
@ -470,9 +547,11 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k
end if
call loc_coo%free()
call glob_coo%set_nzeros(nzg)
if (present(dupl)) call glob_coo%set_dupl(dupl)
call globa%mv_from(glob_coo)
if ((root_ == -1).or.(root_ == me)) then
call glob_coo%set_nzeros(nzg)
if (present(dupl)) call glob_coo%set_dupl(dupl)
call globa%mv_from(glob_coo)
end if
write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_