From 7c491f06f97250f93cdb852428515895ed4d79a1 Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Tue, 14 Jan 2020 08:22:05 +0000 Subject: [PATCH] Fixed implementation of SPGATHER --- base/comm/psb_cspgather.F90 | 205 +++++++++++++++++++++++++----------- base/comm/psb_dspgather.F90 | 205 +++++++++++++++++++++++++----------- base/comm/psb_ispgather.F90 | 205 +++++++++++++++++++++++++----------- base/comm/psb_lspgather.F90 | 205 +++++++++++++++++++++++++----------- base/comm/psb_sspgather.F90 | 205 +++++++++++++++++++++++++----------- base/comm/psb_zspgather.F90 | 205 +++++++++++++++++++++++++----------- 6 files changed, 852 insertions(+), 378 deletions(-) diff --git a/base/comm/psb_cspgather.F90 b/base/comm/psb_cspgather.F90 index 45dcc667..a0855982 100644 --- a/base/comm/psb_cspgather.F90 +++ b/base/comm/psb_cspgather.F90 @@ -70,7 +70,7 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: locia(:), locja(:), glbia(:), glbja(:) integer(psb_ipk_) :: ierr(5) @@ -98,7 +98,12 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -137,27 +142,48 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep call psb_errpush(info,name); goto 9999 end if #endif + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call psb_realloc(nzg,glbia,info) + if (info == psb_success_) call psb_realloc(nzg,glbja,info) + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call psb_realloc(1,glbia,info) + if (info == psb_success_) call psb_realloc(1,glbja,info) + if (info == psb_success_) call glob_coo%allocate(1,1,1) + end if - if (info == psb_success_) call psb_realloc(nzg,glbia,info) - if (info == psb_success_) call psb_realloc(nzg,glbja,info) - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_c_spk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& - & glbia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& - & glbja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_spk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_spk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + + end if if (minfo /= psb_success_) then info = minfo @@ -172,11 +198,13 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - glob_coo%ia(1:nzg) = glbia(1:nzg) - glob_coo%ja(1:nzg) = glbja(1:nzg) - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + glob_coo%ia(1:nzg) = glbia(1:nzg) + glob_coo%ja(1:nzg) = glbja(1:nzg) + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if deallocate(glbia,glbja, stat=info) else @@ -228,7 +256,7 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -256,7 +284,13 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -269,7 +303,7 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee goto 9999 end if - + if (keeploc_) then call loca%cp_to(loc_coo) else @@ -294,7 +328,11 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 ! ! PLS REVIEW AND ADD OVERFLOW ERROR CHECKING @@ -303,24 +341,37 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_c_spk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_spk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_spk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') goto 9999 - end if + end if call loc_coo%free() ! ! Is the code below safe? For very large cases @@ -328,9 +379,11 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ @@ -346,7 +399,7 @@ subroutine psb_lcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_error_handler(ione*ictxt,err_act) return - + end subroutine psb_lcsp_allgather subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keeploc) @@ -378,7 +431,7 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k integer(psb_lpk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -406,7 +459,13 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -444,25 +503,43 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_c_spk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_spk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_spk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') @@ -470,9 +547,11 @@ subroutine psb_lclcsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k end if call loc_coo%free() ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ diff --git a/base/comm/psb_dspgather.F90 b/base/comm/psb_dspgather.F90 index 62084d48..91b1c9ba 100644 --- a/base/comm/psb_dspgather.F90 +++ b/base/comm/psb_dspgather.F90 @@ -70,7 +70,7 @@ subroutine psb_dsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: locia(:), locja(:), glbia(:), glbja(:) integer(psb_ipk_) :: ierr(5) @@ -98,7 +98,12 @@ subroutine psb_dsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -137,27 +142,48 @@ subroutine psb_dsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep call psb_errpush(info,name); goto 9999 end if #endif + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call psb_realloc(nzg,glbia,info) + if (info == psb_success_) call psb_realloc(nzg,glbja,info) + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call psb_realloc(1,glbia,info) + if (info == psb_success_) call psb_realloc(1,glbja,info) + if (info == psb_success_) call glob_coo%allocate(1,1,1) + end if - if (info == psb_success_) call psb_realloc(nzg,glbia,info) - if (info == psb_success_) call psb_realloc(nzg,glbja,info) - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_r_dpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& - & glbia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& - & glbja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_dpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_dpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + + end if if (minfo /= psb_success_) then info = minfo @@ -172,11 +198,13 @@ subroutine psb_dsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - glob_coo%ia(1:nzg) = glbia(1:nzg) - glob_coo%ja(1:nzg) = glbja(1:nzg) - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + glob_coo%ia(1:nzg) = glbia(1:nzg) + glob_coo%ja(1:nzg) = glbja(1:nzg) + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if deallocate(glbia,glbja, stat=info) else @@ -228,7 +256,7 @@ subroutine psb_ldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -256,7 +284,13 @@ subroutine psb_ldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -269,7 +303,7 @@ subroutine psb_ldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee goto 9999 end if - + if (keeploc_) then call loca%cp_to(loc_coo) else @@ -294,7 +328,11 @@ subroutine psb_ldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 ! ! PLS REVIEW AND ADD OVERFLOW ERROR CHECKING @@ -303,24 +341,37 @@ subroutine psb_ldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_r_dpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_dpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_dpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') goto 9999 - end if + end if call loc_coo%free() ! ! Is the code below safe? For very large cases @@ -328,9 +379,11 @@ subroutine psb_ldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ @@ -346,7 +399,7 @@ subroutine psb_ldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_error_handler(ione*ictxt,err_act) return - + end subroutine psb_ldsp_allgather subroutine psb_ldldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keeploc) @@ -378,7 +431,7 @@ subroutine psb_ldldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k integer(psb_lpk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -406,7 +459,13 @@ subroutine psb_ldldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -444,25 +503,43 @@ subroutine psb_ldldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_r_dpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_dpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_dpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') @@ -470,9 +547,11 @@ subroutine psb_ldldsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k end if call loc_coo%free() ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ diff --git a/base/comm/psb_ispgather.F90 b/base/comm/psb_ispgather.F90 index d13c2e7c..4229e751 100644 --- a/base/comm/psb_ispgather.F90 +++ b/base/comm/psb_ispgather.F90 @@ -70,7 +70,7 @@ subroutine psb_isp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: locia(:), locja(:), glbia(:), glbja(:) integer(psb_ipk_) :: ierr(5) @@ -98,7 +98,12 @@ subroutine psb_isp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -137,27 +142,48 @@ subroutine psb_isp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep call psb_errpush(info,name); goto 9999 end if #endif + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call psb_realloc(nzg,glbia,info) + if (info == psb_success_) call psb_realloc(nzg,glbja,info) + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call psb_realloc(1,glbia,info) + if (info == psb_success_) call psb_realloc(1,glbja,info) + if (info == psb_success_) call glob_coo%allocate(1,1,1) + end if - if (info == psb_success_) call psb_realloc(nzg,glbia,info) - if (info == psb_success_) call psb_realloc(nzg,glbja,info) - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_ipk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_ipk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& - & glbia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& - & glbja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_ipk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_ipk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_ipk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_ipk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + + end if if (minfo /= psb_success_) then info = minfo @@ -172,11 +198,13 @@ subroutine psb_isp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - glob_coo%ia(1:nzg) = glbia(1:nzg) - glob_coo%ja(1:nzg) = glbja(1:nzg) - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + glob_coo%ia(1:nzg) = glbia(1:nzg) + glob_coo%ja(1:nzg) = glbja(1:nzg) + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if deallocate(glbia,glbja, stat=info) else @@ -228,7 +256,7 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -256,7 +284,13 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -269,7 +303,7 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k goto 9999 end if - + if (keeploc_) then call loca%cp_to(loc_coo) else @@ -294,7 +328,11 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 ! ! PLS REVIEW AND ADD OVERFLOW ERROR CHECKING @@ -303,24 +341,37 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_ipk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_ipk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_ipk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_ipk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_ipk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_ipk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') goto 9999 - end if + end if call loc_coo%free() ! ! Is the code below safe? For very large cases @@ -328,9 +379,11 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ @@ -346,7 +399,7 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_error_handler(ione*ictxt,err_act) return - + end subroutine psb_@LX@sp_allgather subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keeploc) @@ -378,7 +431,7 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn integer(psb_lpk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -406,7 +459,13 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -444,25 +503,43 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_ipk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_ipk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_ipk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_ipk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_ipk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_ipk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') @@ -470,9 +547,11 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn end if call loc_coo%free() ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ diff --git a/base/comm/psb_lspgather.F90 b/base/comm/psb_lspgather.F90 index 2f588116..d475750b 100644 --- a/base/comm/psb_lspgather.F90 +++ b/base/comm/psb_lspgather.F90 @@ -70,7 +70,7 @@ subroutine psb_lsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: locia(:), locja(:), glbia(:), glbja(:) integer(psb_ipk_) :: ierr(5) @@ -98,7 +98,12 @@ subroutine psb_lsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -137,27 +142,48 @@ subroutine psb_lsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep call psb_errpush(info,name); goto 9999 end if #endif + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call psb_realloc(nzg,glbia,info) + if (info == psb_success_) call psb_realloc(nzg,glbja,info) + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call psb_realloc(1,glbia,info) + if (info == psb_success_) call psb_realloc(1,glbja,info) + if (info == psb_success_) call glob_coo%allocate(1,1,1) + end if - if (info == psb_success_) call psb_realloc(nzg,glbia,info) - if (info == psb_success_) call psb_realloc(nzg,glbja,info) - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_lpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& - & glbia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& - & glbja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_lpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_lpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + + end if if (minfo /= psb_success_) then info = minfo @@ -172,11 +198,13 @@ subroutine psb_lsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - glob_coo%ia(1:nzg) = glbia(1:nzg) - glob_coo%ja(1:nzg) = glbja(1:nzg) - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + glob_coo%ia(1:nzg) = glbia(1:nzg) + glob_coo%ja(1:nzg) = glbja(1:nzg) + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if deallocate(glbia,glbja, stat=info) else @@ -228,7 +256,7 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -256,7 +284,13 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -269,7 +303,7 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k goto 9999 end if - + if (keeploc_) then call loca%cp_to(loc_coo) else @@ -294,7 +328,11 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 ! ! PLS REVIEW AND ADD OVERFLOW ERROR CHECKING @@ -303,24 +341,37 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_lpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_lpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_lpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') goto 9999 - end if + end if call loc_coo%free() ! ! Is the code below safe? For very large cases @@ -328,9 +379,11 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ @@ -346,7 +399,7 @@ subroutine psb_@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_error_handler(ione*ictxt,err_act) return - + end subroutine psb_@LX@sp_allgather subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keeploc) @@ -378,7 +431,7 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn integer(psb_lpk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -406,7 +459,13 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -444,25 +503,43 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_lpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_lpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_lpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') @@ -470,9 +547,11 @@ subroutine psb_@LX@@LX@sp_allgather(globa, loca, desc_a, info, root, dupl,keepn end if call loc_coo%free() ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ diff --git a/base/comm/psb_sspgather.F90 b/base/comm/psb_sspgather.F90 index 7123a988..2de0e046 100644 --- a/base/comm/psb_sspgather.F90 +++ b/base/comm/psb_sspgather.F90 @@ -70,7 +70,7 @@ subroutine psb_ssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: locia(:), locja(:), glbia(:), glbja(:) integer(psb_ipk_) :: ierr(5) @@ -98,7 +98,12 @@ subroutine psb_ssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -137,27 +142,48 @@ subroutine psb_ssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep call psb_errpush(info,name); goto 9999 end if #endif + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call psb_realloc(nzg,glbia,info) + if (info == psb_success_) call psb_realloc(nzg,glbja,info) + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call psb_realloc(1,glbia,info) + if (info == psb_success_) call psb_realloc(1,glbja,info) + if (info == psb_success_) call glob_coo%allocate(1,1,1) + end if - if (info == psb_success_) call psb_realloc(nzg,glbia,info) - if (info == psb_success_) call psb_realloc(nzg,glbja,info) - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_r_spk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& - & glbia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& - & glbja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_spk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_spk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + + end if if (minfo /= psb_success_) then info = minfo @@ -172,11 +198,13 @@ subroutine psb_ssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - glob_coo%ia(1:nzg) = glbia(1:nzg) - glob_coo%ja(1:nzg) = glbja(1:nzg) - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + glob_coo%ia(1:nzg) = glbia(1:nzg) + glob_coo%ja(1:nzg) = glbja(1:nzg) + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if deallocate(glbia,glbja, stat=info) else @@ -228,7 +256,7 @@ subroutine psb_lssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -256,7 +284,13 @@ subroutine psb_lssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -269,7 +303,7 @@ subroutine psb_lssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee goto 9999 end if - + if (keeploc_) then call loca%cp_to(loc_coo) else @@ -294,7 +328,11 @@ subroutine psb_lssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 ! ! PLS REVIEW AND ADD OVERFLOW ERROR CHECKING @@ -303,24 +341,37 @@ subroutine psb_lssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_r_spk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_spk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_spk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') goto 9999 - end if + end if call loc_coo%free() ! ! Is the code below safe? For very large cases @@ -328,9 +379,11 @@ subroutine psb_lssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ @@ -346,7 +399,7 @@ subroutine psb_lssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_error_handler(ione*ictxt,err_act) return - + end subroutine psb_lssp_allgather subroutine psb_lslssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keeploc) @@ -378,7 +431,7 @@ subroutine psb_lslssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k integer(psb_lpk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -406,7 +459,13 @@ subroutine psb_lslssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -444,25 +503,43 @@ subroutine psb_lslssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_r_spk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_spk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_r_spk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') @@ -470,9 +547,11 @@ subroutine psb_lslssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k end if call loc_coo%free() ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ diff --git a/base/comm/psb_zspgather.F90 b/base/comm/psb_zspgather.F90 index f4a27681..9ed7ed6f 100644 --- a/base/comm/psb_zspgather.F90 +++ b/base/comm/psb_zspgather.F90 @@ -70,7 +70,7 @@ subroutine psb_zsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: locia(:), locja(:), glbia(:), glbja(:) integer(psb_ipk_) :: ierr(5) @@ -98,7 +98,12 @@ subroutine psb_zsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -137,27 +142,48 @@ subroutine psb_zsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep call psb_errpush(info,name); goto 9999 end if #endif + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call psb_realloc(nzg,glbia,info) + if (info == psb_success_) call psb_realloc(nzg,glbja,info) + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call psb_realloc(1,glbia,info) + if (info == psb_success_) call psb_realloc(1,glbja,info) + if (info == psb_success_) call glob_coo%allocate(1,1,1) + end if - if (info == psb_success_) call psb_realloc(nzg,glbia,info) - if (info == psb_success_) call psb_realloc(nzg,glbja,info) - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_c_dpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& - & glbia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& - & glbja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_dpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_dpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locia,ndx,psb_mpi_lpk_,& + & glbia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(locja,ndx,psb_mpi_lpk_,& + & glbja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + + end if if (minfo /= psb_success_) then info = minfo @@ -172,11 +198,13 @@ subroutine psb_zsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - glob_coo%ia(1:nzg) = glbia(1:nzg) - glob_coo%ja(1:nzg) = glbja(1:nzg) - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + glob_coo%ia(1:nzg) = glbia(1:nzg) + glob_coo%ja(1:nzg) = glbja(1:nzg) + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if deallocate(glbia,glbja, stat=info) else @@ -228,7 +256,7 @@ subroutine psb_lzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee integer(psb_ipk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -256,7 +284,13 @@ subroutine psb_lzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -269,7 +303,7 @@ subroutine psb_lzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee goto 9999 end if - + if (keeploc_) then call loca%cp_to(loc_coo) else @@ -294,7 +328,11 @@ subroutine psb_lzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 ! ! PLS REVIEW AND ADD OVERFLOW ERROR CHECKING @@ -303,24 +341,37 @@ subroutine psb_lzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_c_dpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_dpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_dpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') goto 9999 - end if + end if call loc_coo%free() ! ! Is the code below safe? For very large cases @@ -328,9 +379,11 @@ subroutine psb_lzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee ! for very large cases it does not make sense to ! gather the matrix on a single procecss anyway... ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_ @@ -346,7 +399,7 @@ subroutine psb_lzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,kee call psb_error_handler(ione*ictxt,err_act) return - + end subroutine psb_lzsp_allgather subroutine psb_lzlzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keeploc) @@ -378,7 +431,7 @@ subroutine psb_lzlzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k integer(psb_lpk_) :: ip,naggrm1,naggrp1, i, j, k, nzl logical :: keepnum_, keeploc_ integer(psb_mpk_) :: ictxt,np,me - integer(psb_mpk_) :: icomm, minfo, ndx + integer(psb_mpk_) :: icomm, minfo, ndx, root_ integer(psb_mpk_), allocatable :: nzbr(:), idisp(:) integer(psb_lpk_), allocatable :: lnzbr(:) integer(psb_ipk_) :: ierr(5) @@ -406,7 +459,13 @@ subroutine psb_lzlzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k else keeploc_ = .true. end if - call globa%free() + if (present(root)) then + root_ = root + else + root_ = -1 + end if + + if ((root_ == -1).or.(root_ == me)) call globa%free() if (keepnum_) then nrg = desc_a%get_global_rows() @@ -444,25 +503,43 @@ subroutine psb_lzlzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k call psb_errpush(info,name); goto 9999 end if #endif - if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + if ((root_ == -1).or.(root_ == me)) then + if (info == psb_success_) call glob_coo%allocate(nrg,ncg,nzg) + else + if (info == psb_success_) call glob_coo%allocate(1_psb_lpk_,1_psb_lpk_,1_psb_lpk_) + end if if (info /= psb_success_) goto 9999 do ip=1,np idisp(ip) = sum(nzbr(1:ip-1)) enddo - ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& - & glob_coo%val,nzbr,idisp,& - & psb_mpi_c_dpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& - & glob_coo%ia,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - if (minfo == psb_success_) call & - & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& - & glob_coo%ja,nzbr,idisp,& - & psb_mpi_lpk_,icomm,minfo) - + ndx = nzbr(me+1) + + if (root_ == -1) then + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_dpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_allgatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,icomm,minfo) + else + call mpi_gatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& + & glob_coo%val,nzbr,idisp,& + & psb_mpi_c_dpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ia,ndx,psb_mpi_lpk_,& + & glob_coo%ia,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + if (minfo == psb_success_) call & + & mpi_gatherv(loc_coo%ja,ndx,psb_mpi_lpk_,& + & glob_coo%ja,nzbr,idisp,& + & psb_mpi_lpk_,root_,icomm,minfo) + end if if (minfo /= psb_success_) then info = minfo call psb_errpush(psb_err_internal_error_,name,a_err=' from mpi_allgatherv') @@ -470,9 +547,11 @@ subroutine psb_lzlzsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,k end if call loc_coo%free() ! - call glob_coo%set_nzeros(nzg) - if (present(dupl)) call glob_coo%set_dupl(dupl) - call globa%mv_from(glob_coo) + if ((root_ == -1).or.(root_ == me)) then + call glob_coo%set_nzeros(nzg) + if (present(dupl)) call glob_coo%set_dupl(dupl) + call globa%mv_from(glob_coo) + end if else write(psb_err_unit,*) 'SP_ALLGATHER: Not implemented yet with keepnum ',keepnum_