From bdc3cc925a07ce0939af7e9c7feca278f67ee3c2 Mon Sep 17 00:00:00 2001 From: Salvatore Filippone Date: Thu, 1 Aug 2013 15:44:54 +0000 Subject: [PATCH] psblas-3.99: base/comm/psb_cspgather.F90 base/comm/psb_dspgather.F90 base/comm/psb_sspgather.F90 base/comm/psb_zspgather.F90 base/internals/psi_cswapdata.F90 base/internals/psi_desc_impl.f90 base/internals/psi_dswapdata.F90 base/internals/psi_sswapdata.F90 base/internals/psi_zswapdata.F90 base/modules/Makefile base/modules/psb_c_base_vect_mod.f90 base/modules/psb_c_tools_mod.f90 base/modules/psb_cd_tools_mod.f90 base/modules/psb_d_base_vect_mod.f90 base/modules/psb_d_tools_mod.f90 base/modules/psb_desc_mod.f90 base/modules/psb_s_base_vect_mod.f90 base/modules/psb_s_tools_mod.f90 base/modules/psb_z_base_vect_mod.f90 base/modules/psb_z_tools_mod.f90 base/modules/psi_c_mod.f90 base/modules/psi_d_mod.f90 base/modules/psi_i_mod.f90 base/modules/psi_s_mod.f90 base/modules/psi_z_mod.f90 base/tools/psb_ccdbldext.F90 base/tools/psb_cd_lstext.f90 base/tools/psb_cdcpy.F90 base/tools/psb_csphalo.F90 base/tools/psb_dcdbldext.F90 base/tools/psb_dsphalo.F90 base/tools/psb_icdasb.F90 base/tools/psb_scdbldext.F90 base/tools/psb_ssphalo.F90 base/tools/psb_zcdbldext.F90 base/tools/psb_zsphalo.F90 First round of changes: Add encapsulated vectors for DESC. Add MOLD to CDASB. Change swapdata. --- base/comm/psb_cspgather.F90 | 4 +- base/comm/psb_dspgather.F90 | 4 +- base/comm/psb_sspgather.F90 | 4 +- base/comm/psb_zspgather.F90 | 4 +- base/internals/psi_cswapdata.F90 | 369 ++++++++++++++++++++++++++- base/internals/psi_desc_impl.f90 | 9 +- base/internals/psi_dswapdata.F90 | 369 ++++++++++++++++++++++++++- base/internals/psi_sswapdata.F90 | 369 ++++++++++++++++++++++++++- base/internals/psi_zswapdata.F90 | 369 ++++++++++++++++++++++++++- base/modules/Makefile | 2 +- base/modules/psb_c_base_vect_mod.f90 | 38 ++- base/modules/psb_c_tools_mod.f90 | 2 +- base/modules/psb_cd_tools_mod.f90 | 20 +- base/modules/psb_d_base_vect_mod.f90 | 38 ++- base/modules/psb_d_tools_mod.f90 | 2 +- base/modules/psb_desc_mod.f90 | 107 +++++++- base/modules/psb_s_base_vect_mod.f90 | 38 ++- base/modules/psb_s_tools_mod.f90 | 2 +- base/modules/psb_z_base_vect_mod.f90 | 38 ++- base/modules/psb_z_tools_mod.f90 | 2 +- base/modules/psi_c_mod.f90 | 12 +- base/modules/psi_d_mod.f90 | 13 +- base/modules/psi_i_mod.f90 | 5 +- base/modules/psi_s_mod.f90 | 12 +- base/modules/psi_z_mod.f90 | 12 +- base/tools/psb_ccdbldext.F90 | 2 +- base/tools/psb_cd_lstext.f90 | 2 +- base/tools/psb_cdcpy.F90 | 28 +- base/tools/psb_csphalo.F90 | 4 +- base/tools/psb_dcdbldext.F90 | 2 +- base/tools/psb_dsphalo.F90 | 4 +- base/tools/psb_icdasb.F90 | 5 +- base/tools/psb_scdbldext.F90 | 2 +- base/tools/psb_ssphalo.F90 | 4 +- base/tools/psb_zcdbldext.F90 | 2 +- base/tools/psb_zsphalo.F90 | 4 +- 36 files changed, 1817 insertions(+), 86 deletions(-) diff --git a/base/comm/psb_cspgather.F90 b/base/comm/psb_cspgather.F90 index 7a70d748..531bd4c6 100644 --- a/base/comm/psb_cspgather.F90 +++ b/base/comm/psb_cspgather.F90 @@ -113,9 +113,9 @@ subroutine psb_csp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep idisp(ip) = sum(nzbr(1:ip-1)) enddo ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,mpi_complex,& + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_spk_,& & glob_coo%val,nzbr,idisp,& - & mpi_complex,icomm,minfo) + & psb_mpi_c_spk_,icomm,minfo) if (minfo == psb_success_) call & & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_ipk_integer,& & glob_coo%ia,nzbr,idisp,& diff --git a/base/comm/psb_dspgather.F90 b/base/comm/psb_dspgather.F90 index dabe30da..c032874b 100644 --- a/base/comm/psb_dspgather.F90 +++ b/base/comm/psb_dspgather.F90 @@ -113,9 +113,9 @@ subroutine psb_dsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep idisp(ip) = sum(nzbr(1:ip-1)) enddo ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,mpi_double_precision,& + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_dpk_,& & glob_coo%val,nzbr,idisp,& - & mpi_double_precision,icomm,minfo) + & psb_mpi_r_dpk_,icomm,minfo) if (minfo == psb_success_) call & & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_ipk_integer,& & glob_coo%ia,nzbr,idisp,& diff --git a/base/comm/psb_sspgather.F90 b/base/comm/psb_sspgather.F90 index a2f3a6b0..231eda6a 100644 --- a/base/comm/psb_sspgather.F90 +++ b/base/comm/psb_sspgather.F90 @@ -113,9 +113,9 @@ subroutine psb_ssp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep idisp(ip) = sum(nzbr(1:ip-1)) enddo ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,mpi_real,& + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_r_spk_,& & glob_coo%val,nzbr,idisp,& - & mpi_real,icomm,minfo) + & psb_mpi_r_spk_,icomm,minfo) if (minfo == psb_success_) call & & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_ipk_integer,& & glob_coo%ia,nzbr,idisp,& diff --git a/base/comm/psb_zspgather.F90 b/base/comm/psb_zspgather.F90 index b14e0822..c74f1676 100644 --- a/base/comm/psb_zspgather.F90 +++ b/base/comm/psb_zspgather.F90 @@ -113,9 +113,9 @@ subroutine psb_zsp_allgather(globa, loca, desc_a, info, root, dupl,keepnum,keep idisp(ip) = sum(nzbr(1:ip-1)) enddo ndx = nzbr(me+1) - call mpi_allgatherv(loc_coo%val,ndx,mpi_double_complex,& + call mpi_allgatherv(loc_coo%val,ndx,psb_mpi_c_dpk_,& & glob_coo%val,nzbr,idisp,& - & mpi_double_complex,icomm,minfo) + & psb_mpi_c_dpk_,icomm,minfo) if (minfo == psb_success_) call & & mpi_allgatherv(loc_coo%ia,ndx,psb_mpi_ipk_integer,& & glob_coo%ia,nzbr,idisp,& diff --git a/base/internals/psi_cswapdata.F90 b/base/internals/psi_cswapdata.F90 index d70b15a8..11eea421 100644 --- a/base/internals/psi_cswapdata.F90 +++ b/base/internals/psi_cswapdata.F90 @@ -1040,6 +1040,7 @@ subroutine psi_cswapdata_vect(flag,beta,y,desc_a,work,info,data) ! locals integer(psb_ipk_) :: ictxt, np, me, icomm, idxs, idxr, totxch, data_, err_act integer(psb_ipk_), pointer :: d_idx(:) + class(psb_i_base_vect_type), pointer :: d_vidx character(len=20) :: name info=psb_success_ @@ -1068,13 +1069,14 @@ subroutine psi_cswapdata_vect(flag,beta,y,desc_a,work,info,data) data_ = psb_comm_halo_ end if - call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) +!!$ call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) + call desc_a%get_list(data_,d_vidx,totxch,idxr,idxs,info) if (info /= psb_success_) then call psb_errpush(psb_err_internal_error_,name,a_err='psb_cd_get_list') goto 9999 end if - call psi_swapdata(ictxt,icomm,flag,beta,y,d_idx,totxch,idxs,idxr,work,info) + call psi_swapdata(ictxt,icomm,flag,beta,y,d_vidx,totxch,idxs,idxr,work,info) if (info /= psb_success_) goto 9999 call psb_erractionrestore(err_act) @@ -1451,3 +1453,366 @@ subroutine psi_cswapidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv, return end subroutine psi_cswapidx_vect + +subroutine psi_cswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + + use psi_mod, psb_protect_name => psi_cswap_vidx_vect + use psb_error_mod + use psb_desc_mod + use psb_penv_mod + use psb_c_base_vect_mod +#ifdef MPI_MOD + use mpi +#endif + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_c_base_vect_type) :: y + complex(psb_spk_) :: beta + complex(psb_spk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + + ! locals + integer(psb_mpik_) :: ictxt, icomm, np, me,& + & proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable, dimension(:) :: bsdidx, brvidx,& + & sdsz, rvsz, prcid, rvhd, sdhd + integer(psb_ipk_) :: nesd, nerv,& + & err_act, i, idx_pt, totsnd_, totrcv_,& + & snd_pt, rcv_pt, pnti, n + integer(psb_ipk_) :: ierr(5) + logical :: swap_mpi, swap_sync, swap_send, swap_recv,& + & albf,do_send,do_recv + logical, parameter :: usersend=.false. + + complex(psb_spk_), pointer, dimension(:) :: sndbuf, rcvbuf +#ifdef HAVE_VOLATILE + volatile :: sndbuf, rcvbuf +#endif + character(len=20) :: name + + info=psb_success_ + name='psi_swap_datav' + call psb_erractionsave(err_act) + ictxt = iictxt + icomm = iicomm + call psb_info(ictxt,me,np) + if (np == -1) then + info=psb_err_context_error_ + call psb_errpush(info,name) + goto 9999 + endif + + n=1 + + swap_mpi = iand(flag,psb_swap_mpi_) /= 0 + swap_sync = iand(flag,psb_swap_sync_) /= 0 + swap_send = iand(flag,psb_swap_send_) /= 0 + swap_recv = iand(flag,psb_swap_recv_) /= 0 + do_send = swap_mpi .or. swap_sync .or. swap_send + do_recv = swap_mpi .or. swap_sync .or. swap_recv + + totrcv_ = totrcv * n + totsnd_ = totsnd * n + + if (swap_mpi) then + allocate(sdsz(0:np-1), rvsz(0:np-1), bsdidx(0:np-1),& + & brvidx(0:np-1), rvhd(0:np-1), sdhd(0:np-1), prcid(0:np-1),& + & stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + rvhd(:) = mpi_request_null + sdsz(:) = 0 + rvsz(:) = 0 + + ! prepare info for communications + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + call psb_get_rank(prcid(proc_to_comm),ictxt,proc_to_comm) + + brvidx(proc_to_comm) = rcv_pt + rvsz(proc_to_comm) = nerv + + bsdidx(proc_to_comm) = snd_pt + sdsz(proc_to_comm) = nesd + + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + + end do + + else + allocate(rvhd(totxch),prcid(totxch),stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + end if + + + totrcv_ = max(totrcv_,1) + totsnd_ = max(totsnd_,1) + if((totrcv_+totsnd_) < size(work)) then + sndbuf => work(1:totsnd_) + rcvbuf => work(totsnd_+1:totsnd_+totrcv_) + albf=.false. + else + allocate(sndbuf(totsnd_),rcvbuf(totrcv_), stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + albf=.true. + end if + + + if (do_send) then + + ! Pack send buffers + pnti = 1 + snd_pt = 1 + do i=1, totxch + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+nerv+psb_n_elem_send_ + call y%gth(idx_pt,nesd,idx,& + & sndbuf(snd_pt:snd_pt+nesd-1)) + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + ! Case SWAP_MPI + if (swap_mpi) then + + ! swap elements using mpi_alltoallv + call mpi_alltoallv(sndbuf,sdsz,bsdidx,& + & psb_mpi_c_spk_,rcvbuf,rvsz,& + & brvidx,psb_mpi_c_spk_,icomm,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + + else if (swap_sync) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + if (proc_to_comm < me) then + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + else if (proc_to_comm > me) then + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send .and. swap_recv) then + + ! First I post all the non blocking receives + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + call psb_get_rank(prcid(i),ictxt,proc_to_comm) + if ((nerv>0).and.(proc_to_comm /= me)) then + p2ptag = psb_complex_swap_tag + call mpi_irecv(rcvbuf(rcv_pt),nerv,& + & psb_mpi_c_spk_,prcid(i),& + & p2ptag, icomm,rvhd(i),iret) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + ! Then I post all the blocking sends + if (usersend) call mpi_barrier(icomm,iret) + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_complex_swap_tag + + if ((nesd>0).and.(proc_to_comm /= me)) then + if (usersend) then + call mpi_rsend(sndbuf(snd_pt),nesd,& + & psb_mpi_c_spk_,prcid(i),& + & p2ptag,icomm,iret) + else + call mpi_send(sndbuf(snd_pt),nesd,& + & psb_mpi_c_spk_,prcid(i),& + & p2ptag,icomm,iret) + end if + + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + pnti = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_complex_swap_tag + + if ((proc_to_comm /= me).and.(nerv>0)) then + call mpi_wait(rvhd(i),p2pstat,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + else if (swap_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (do_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+psb_n_elem_recv_ + call y%sct(idx_pt,nerv,idx,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (swap_mpi) then + deallocate(sdsz,rvsz,bsdidx,brvidx,rvhd,prcid,sdhd,& + & stat=info) + else + deallocate(rvhd,prcid,stat=info) + end if + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + if(albf) deallocate(sndbuf,rcvbuf,stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return +end subroutine psi_cswap_vidx_vect + diff --git a/base/internals/psi_desc_impl.f90 b/base/internals/psi_desc_impl.f90 index c53d43ec..0a48ec4a 100644 --- a/base/internals/psi_desc_impl.f90 +++ b/base/internals/psi_desc_impl.f90 @@ -60,7 +60,7 @@ subroutine psi_renum_index(iperm,idx,info) end subroutine psi_renum_index -subroutine psi_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info) +subroutine psi_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) use psi_mod, psi_protect_name => psi_cnv_dsc use psb_realloc_mod @@ -70,6 +70,7 @@ subroutine psi_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info) integer(psb_ipk_), intent(in) :: halo_in(:), ovrlap_in(:),ext_in(:) type(psb_desc_type), intent(inout) :: cdesc integer(psb_ipk_), intent(out) :: info + class(psb_i_base_vect_type), optional, intent(in) :: mold ! ....local scalars.... integer(psb_ipk_) :: np,me @@ -178,6 +179,12 @@ subroutine psi_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info) call psi_crea_bnd_elem(idx_out,cdesc,info) if (info == psb_success_) call psb_move_alloc(idx_out,cdesc%bnd_elem,info) + call cdesc%v_halo_index%bld(cdesc%halo_index,mold=mold) + call cdesc%v_ext_index%bld(cdesc%ext_index,mold=mold) + call cdesc%v_ovrlap_index%bld(cdesc%ovrlap_index,mold=mold) + call cdesc%v_ovr_mst_idx%bld(cdesc%ovr_mst_idx,mold=mold) + + if (info /= psb_success_) then call psb_errpush(psb_err_from_subroutine_,name,a_err='psi_crea_bnd_elem') goto 9999 diff --git a/base/internals/psi_dswapdata.F90 b/base/internals/psi_dswapdata.F90 index fb9337cc..562d1481 100644 --- a/base/internals/psi_dswapdata.F90 +++ b/base/internals/psi_dswapdata.F90 @@ -1040,6 +1040,7 @@ subroutine psi_dswapdata_vect(flag,beta,y,desc_a,work,info,data) ! locals integer(psb_ipk_) :: ictxt, np, me, icomm, idxs, idxr, totxch, data_, err_act integer(psb_ipk_), pointer :: d_idx(:) + class(psb_i_base_vect_type), pointer :: d_vidx character(len=20) :: name info=psb_success_ @@ -1068,13 +1069,14 @@ subroutine psi_dswapdata_vect(flag,beta,y,desc_a,work,info,data) data_ = psb_comm_halo_ end if - call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) +!!$ call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) + call desc_a%get_list(data_,d_vidx,totxch,idxr,idxs,info) if (info /= psb_success_) then call psb_errpush(psb_err_internal_error_,name,a_err='psb_cd_get_list') goto 9999 end if - call psi_swapdata(ictxt,icomm,flag,beta,y,d_idx,totxch,idxs,idxr,work,info) + call psi_swapdata(ictxt,icomm,flag,beta,y,d_vidx,totxch,idxs,idxr,work,info) if (info /= psb_success_) goto 9999 call psb_erractionrestore(err_act) @@ -1451,3 +1453,366 @@ subroutine psi_dswapidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv, return end subroutine psi_dswapidx_vect + +subroutine psi_dswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + + use psi_mod, psb_protect_name => psi_dswap_vidx_vect + use psb_error_mod + use psb_desc_mod + use psb_penv_mod + use psb_d_base_vect_mod +#ifdef MPI_MOD + use mpi +#endif + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_d_base_vect_type) :: y + real(psb_dpk_) :: beta + real(psb_dpk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + + ! locals + integer(psb_mpik_) :: ictxt, icomm, np, me,& + & proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable, dimension(:) :: bsdidx, brvidx,& + & sdsz, rvsz, prcid, rvhd, sdhd + integer(psb_ipk_) :: nesd, nerv,& + & err_act, i, idx_pt, totsnd_, totrcv_,& + & snd_pt, rcv_pt, pnti, n + integer(psb_ipk_) :: ierr(5) + logical :: swap_mpi, swap_sync, swap_send, swap_recv,& + & albf,do_send,do_recv + logical, parameter :: usersend=.false. + + real(psb_dpk_), pointer, dimension(:) :: sndbuf, rcvbuf +#ifdef HAVE_VOLATILE + volatile :: sndbuf, rcvbuf +#endif + character(len=20) :: name + + info=psb_success_ + name='psi_swap_datav' + call psb_erractionsave(err_act) + ictxt = iictxt + icomm = iicomm + call psb_info(ictxt,me,np) + if (np == -1) then + info=psb_err_context_error_ + call psb_errpush(info,name) + goto 9999 + endif + + n=1 + + swap_mpi = iand(flag,psb_swap_mpi_) /= 0 + swap_sync = iand(flag,psb_swap_sync_) /= 0 + swap_send = iand(flag,psb_swap_send_) /= 0 + swap_recv = iand(flag,psb_swap_recv_) /= 0 + do_send = swap_mpi .or. swap_sync .or. swap_send + do_recv = swap_mpi .or. swap_sync .or. swap_recv + + totrcv_ = totrcv * n + totsnd_ = totsnd * n + + if (swap_mpi) then + allocate(sdsz(0:np-1), rvsz(0:np-1), bsdidx(0:np-1),& + & brvidx(0:np-1), rvhd(0:np-1), sdhd(0:np-1), prcid(0:np-1),& + & stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + rvhd(:) = mpi_request_null + sdsz(:) = 0 + rvsz(:) = 0 + + ! prepare info for communications + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + call psb_get_rank(prcid(proc_to_comm),ictxt,proc_to_comm) + + brvidx(proc_to_comm) = rcv_pt + rvsz(proc_to_comm) = nerv + + bsdidx(proc_to_comm) = snd_pt + sdsz(proc_to_comm) = nesd + + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + + end do + + else + allocate(rvhd(totxch),prcid(totxch),stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + end if + + + totrcv_ = max(totrcv_,1) + totsnd_ = max(totsnd_,1) + if((totrcv_+totsnd_) < size(work)) then + sndbuf => work(1:totsnd_) + rcvbuf => work(totsnd_+1:totsnd_+totrcv_) + albf=.false. + else + allocate(sndbuf(totsnd_),rcvbuf(totrcv_), stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + albf=.true. + end if + + + if (do_send) then + + ! Pack send buffers + pnti = 1 + snd_pt = 1 + do i=1, totxch + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+nerv+psb_n_elem_send_ + call y%gth(idx_pt,nesd,idx,& + & sndbuf(snd_pt:snd_pt+nesd-1)) + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + ! Case SWAP_MPI + if (swap_mpi) then + + ! swap elements using mpi_alltoallv + call mpi_alltoallv(sndbuf,sdsz,bsdidx,& + & psb_mpi_r_dpk_,rcvbuf,rvsz,& + & brvidx,psb_mpi_r_dpk_,icomm,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + + else if (swap_sync) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + if (proc_to_comm < me) then + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + else if (proc_to_comm > me) then + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send .and. swap_recv) then + + ! First I post all the non blocking receives + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + call psb_get_rank(prcid(i),ictxt,proc_to_comm) + if ((nerv>0).and.(proc_to_comm /= me)) then + p2ptag = psb_double_swap_tag + call mpi_irecv(rcvbuf(rcv_pt),nerv,& + & psb_mpi_r_dpk_,prcid(i),& + & p2ptag, icomm,rvhd(i),iret) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + ! Then I post all the blocking sends + if (usersend) call mpi_barrier(icomm,iret) + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_double_swap_tag + + if ((nesd>0).and.(proc_to_comm /= me)) then + if (usersend) then + call mpi_rsend(sndbuf(snd_pt),nesd,& + & psb_mpi_r_dpk_,prcid(i),& + & p2ptag,icomm,iret) + else + call mpi_send(sndbuf(snd_pt),nesd,& + & psb_mpi_r_dpk_,prcid(i),& + & p2ptag,icomm,iret) + end if + + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + pnti = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_double_swap_tag + + if ((proc_to_comm /= me).and.(nerv>0)) then + call mpi_wait(rvhd(i),p2pstat,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + else if (swap_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (do_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+psb_n_elem_recv_ + call y%sct(idx_pt,nerv,idx,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (swap_mpi) then + deallocate(sdsz,rvsz,bsdidx,brvidx,rvhd,prcid,sdhd,& + & stat=info) + else + deallocate(rvhd,prcid,stat=info) + end if + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + if(albf) deallocate(sndbuf,rcvbuf,stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return +end subroutine psi_dswap_vidx_vect + diff --git a/base/internals/psi_sswapdata.F90 b/base/internals/psi_sswapdata.F90 index 5a8681d8..75fc4024 100644 --- a/base/internals/psi_sswapdata.F90 +++ b/base/internals/psi_sswapdata.F90 @@ -1040,6 +1040,7 @@ subroutine psi_sswapdata_vect(flag,beta,y,desc_a,work,info,data) ! locals integer(psb_ipk_) :: ictxt, np, me, icomm, idxs, idxr, totxch, data_, err_act integer(psb_ipk_), pointer :: d_idx(:) + class(psb_i_base_vect_type), pointer :: d_vidx character(len=20) :: name info=psb_success_ @@ -1068,13 +1069,14 @@ subroutine psi_sswapdata_vect(flag,beta,y,desc_a,work,info,data) data_ = psb_comm_halo_ end if - call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) +!!$ call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) + call desc_a%get_list(data_,d_vidx,totxch,idxr,idxs,info) if (info /= psb_success_) then call psb_errpush(psb_err_internal_error_,name,a_err='psb_cd_get_list') goto 9999 end if - call psi_swapdata(ictxt,icomm,flag,beta,y,d_idx,totxch,idxs,idxr,work,info) + call psi_swapdata(ictxt,icomm,flag,beta,y,d_vidx,totxch,idxs,idxr,work,info) if (info /= psb_success_) goto 9999 call psb_erractionrestore(err_act) @@ -1451,3 +1453,366 @@ subroutine psi_sswapidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv, return end subroutine psi_sswapidx_vect + +subroutine psi_sswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + + use psi_mod, psb_protect_name => psi_sswap_vidx_vect + use psb_error_mod + use psb_desc_mod + use psb_penv_mod + use psb_s_base_vect_mod +#ifdef MPI_MOD + use mpi +#endif + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_s_base_vect_type) :: y + real(psb_spk_) :: beta + real(psb_spk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + + ! locals + integer(psb_mpik_) :: ictxt, icomm, np, me,& + & proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable, dimension(:) :: bsdidx, brvidx,& + & sdsz, rvsz, prcid, rvhd, sdhd + integer(psb_ipk_) :: nesd, nerv,& + & err_act, i, idx_pt, totsnd_, totrcv_,& + & snd_pt, rcv_pt, pnti, n + integer(psb_ipk_) :: ierr(5) + logical :: swap_mpi, swap_sync, swap_send, swap_recv,& + & albf,do_send,do_recv + logical, parameter :: usersend=.false. + + real(psb_spk_), pointer, dimension(:) :: sndbuf, rcvbuf +#ifdef HAVE_VOLATILE + volatile :: sndbuf, rcvbuf +#endif + character(len=20) :: name + + info=psb_success_ + name='psi_swap_datav' + call psb_erractionsave(err_act) + ictxt = iictxt + icomm = iicomm + call psb_info(ictxt,me,np) + if (np == -1) then + info=psb_err_context_error_ + call psb_errpush(info,name) + goto 9999 + endif + + n=1 + + swap_mpi = iand(flag,psb_swap_mpi_) /= 0 + swap_sync = iand(flag,psb_swap_sync_) /= 0 + swap_send = iand(flag,psb_swap_send_) /= 0 + swap_recv = iand(flag,psb_swap_recv_) /= 0 + do_send = swap_mpi .or. swap_sync .or. swap_send + do_recv = swap_mpi .or. swap_sync .or. swap_recv + + totrcv_ = totrcv * n + totsnd_ = totsnd * n + + if (swap_mpi) then + allocate(sdsz(0:np-1), rvsz(0:np-1), bsdidx(0:np-1),& + & brvidx(0:np-1), rvhd(0:np-1), sdhd(0:np-1), prcid(0:np-1),& + & stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + rvhd(:) = mpi_request_null + sdsz(:) = 0 + rvsz(:) = 0 + + ! prepare info for communications + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + call psb_get_rank(prcid(proc_to_comm),ictxt,proc_to_comm) + + brvidx(proc_to_comm) = rcv_pt + rvsz(proc_to_comm) = nerv + + bsdidx(proc_to_comm) = snd_pt + sdsz(proc_to_comm) = nesd + + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + + end do + + else + allocate(rvhd(totxch),prcid(totxch),stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + end if + + + totrcv_ = max(totrcv_,1) + totsnd_ = max(totsnd_,1) + if((totrcv_+totsnd_) < size(work)) then + sndbuf => work(1:totsnd_) + rcvbuf => work(totsnd_+1:totsnd_+totrcv_) + albf=.false. + else + allocate(sndbuf(totsnd_),rcvbuf(totrcv_), stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + albf=.true. + end if + + + if (do_send) then + + ! Pack send buffers + pnti = 1 + snd_pt = 1 + do i=1, totxch + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+nerv+psb_n_elem_send_ + call y%gth(idx_pt,nesd,idx,& + & sndbuf(snd_pt:snd_pt+nesd-1)) + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + ! Case SWAP_MPI + if (swap_mpi) then + + ! swap elements using mpi_alltoallv + call mpi_alltoallv(sndbuf,sdsz,bsdidx,& + & psb_mpi_r_spk_,rcvbuf,rvsz,& + & brvidx,psb_mpi_r_spk_,icomm,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + + else if (swap_sync) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + if (proc_to_comm < me) then + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + else if (proc_to_comm > me) then + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send .and. swap_recv) then + + ! First I post all the non blocking receives + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + call psb_get_rank(prcid(i),ictxt,proc_to_comm) + if ((nerv>0).and.(proc_to_comm /= me)) then + p2ptag = psb_real_swap_tag + call mpi_irecv(rcvbuf(rcv_pt),nerv,& + & psb_mpi_r_spk_,prcid(i),& + & p2ptag, icomm,rvhd(i),iret) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + ! Then I post all the blocking sends + if (usersend) call mpi_barrier(icomm,iret) + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_real_swap_tag + + if ((nesd>0).and.(proc_to_comm /= me)) then + if (usersend) then + call mpi_rsend(sndbuf(snd_pt),nesd,& + & psb_mpi_r_spk_,prcid(i),& + & p2ptag,icomm,iret) + else + call mpi_send(sndbuf(snd_pt),nesd,& + & psb_mpi_r_spk_,prcid(i),& + & p2ptag,icomm,iret) + end if + + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + pnti = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_real_swap_tag + + if ((proc_to_comm /= me).and.(nerv>0)) then + call mpi_wait(rvhd(i),p2pstat,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + else if (swap_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (do_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+psb_n_elem_recv_ + call y%sct(idx_pt,nerv,idx,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (swap_mpi) then + deallocate(sdsz,rvsz,bsdidx,brvidx,rvhd,prcid,sdhd,& + & stat=info) + else + deallocate(rvhd,prcid,stat=info) + end if + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + if(albf) deallocate(sndbuf,rcvbuf,stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return +end subroutine psi_sswap_vidx_vect + diff --git a/base/internals/psi_zswapdata.F90 b/base/internals/psi_zswapdata.F90 index 1f96e4d3..379b02fd 100644 --- a/base/internals/psi_zswapdata.F90 +++ b/base/internals/psi_zswapdata.F90 @@ -1040,6 +1040,7 @@ subroutine psi_zswapdata_vect(flag,beta,y,desc_a,work,info,data) ! locals integer(psb_ipk_) :: ictxt, np, me, icomm, idxs, idxr, totxch, data_, err_act integer(psb_ipk_), pointer :: d_idx(:) + class(psb_i_base_vect_type), pointer :: d_vidx character(len=20) :: name info=psb_success_ @@ -1068,13 +1069,14 @@ subroutine psi_zswapdata_vect(flag,beta,y,desc_a,work,info,data) data_ = psb_comm_halo_ end if - call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) +!!$ call desc_a%get_list(data_,d_idx,totxch,idxr,idxs,info) + call desc_a%get_list(data_,d_vidx,totxch,idxr,idxs,info) if (info /= psb_success_) then call psb_errpush(psb_err_internal_error_,name,a_err='psb_cd_get_list') goto 9999 end if - call psi_swapdata(ictxt,icomm,flag,beta,y,d_idx,totxch,idxs,idxr,work,info) + call psi_swapdata(ictxt,icomm,flag,beta,y,d_vidx,totxch,idxs,idxr,work,info) if (info /= psb_success_) goto 9999 call psb_erractionrestore(err_act) @@ -1451,3 +1453,366 @@ subroutine psi_zswapidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv, return end subroutine psi_zswapidx_vect + +subroutine psi_zswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + + use psi_mod, psb_protect_name => psi_zswap_vidx_vect + use psb_error_mod + use psb_desc_mod + use psb_penv_mod + use psb_z_base_vect_mod +#ifdef MPI_MOD + use mpi +#endif + implicit none +#ifdef MPI_H + include 'mpif.h' +#endif + + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_z_base_vect_type) :: y + complex(psb_dpk_) :: beta + complex(psb_dpk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + + ! locals + integer(psb_mpik_) :: ictxt, icomm, np, me,& + & proc_to_comm, p2ptag, p2pstat(mpi_status_size), iret + integer(psb_mpik_), allocatable, dimension(:) :: bsdidx, brvidx,& + & sdsz, rvsz, prcid, rvhd, sdhd + integer(psb_ipk_) :: nesd, nerv,& + & err_act, i, idx_pt, totsnd_, totrcv_,& + & snd_pt, rcv_pt, pnti, n + integer(psb_ipk_) :: ierr(5) + logical :: swap_mpi, swap_sync, swap_send, swap_recv,& + & albf,do_send,do_recv + logical, parameter :: usersend=.false. + + complex(psb_dpk_), pointer, dimension(:) :: sndbuf, rcvbuf +#ifdef HAVE_VOLATILE + volatile :: sndbuf, rcvbuf +#endif + character(len=20) :: name + + info=psb_success_ + name='psi_swap_datav' + call psb_erractionsave(err_act) + ictxt = iictxt + icomm = iicomm + call psb_info(ictxt,me,np) + if (np == -1) then + info=psb_err_context_error_ + call psb_errpush(info,name) + goto 9999 + endif + + n=1 + + swap_mpi = iand(flag,psb_swap_mpi_) /= 0 + swap_sync = iand(flag,psb_swap_sync_) /= 0 + swap_send = iand(flag,psb_swap_send_) /= 0 + swap_recv = iand(flag,psb_swap_recv_) /= 0 + do_send = swap_mpi .or. swap_sync .or. swap_send + do_recv = swap_mpi .or. swap_sync .or. swap_recv + + totrcv_ = totrcv * n + totsnd_ = totsnd * n + + if (swap_mpi) then + allocate(sdsz(0:np-1), rvsz(0:np-1), bsdidx(0:np-1),& + & brvidx(0:np-1), rvhd(0:np-1), sdhd(0:np-1), prcid(0:np-1),& + & stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + rvhd(:) = mpi_request_null + sdsz(:) = 0 + rvsz(:) = 0 + + ! prepare info for communications + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + call psb_get_rank(prcid(proc_to_comm),ictxt,proc_to_comm) + + brvidx(proc_to_comm) = rcv_pt + rvsz(proc_to_comm) = nerv + + bsdidx(proc_to_comm) = snd_pt + sdsz(proc_to_comm) = nesd + + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + + end do + + else + allocate(rvhd(totxch),prcid(totxch),stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + end if + + + totrcv_ = max(totrcv_,1) + totsnd_ = max(totsnd_,1) + if((totrcv_+totsnd_) < size(work)) then + sndbuf => work(1:totsnd_) + rcvbuf => work(totsnd_+1:totsnd_+totrcv_) + albf=.false. + else + allocate(sndbuf(totsnd_),rcvbuf(totrcv_), stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + albf=.true. + end if + + + if (do_send) then + + ! Pack send buffers + pnti = 1 + snd_pt = 1 + do i=1, totxch + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+nerv+psb_n_elem_send_ + call y%gth(idx_pt,nesd,idx,& + & sndbuf(snd_pt:snd_pt+nesd-1)) + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + ! Case SWAP_MPI + if (swap_mpi) then + + ! swap elements using mpi_alltoallv + call mpi_alltoallv(sndbuf,sdsz,bsdidx,& + & psb_mpi_c_dpk_,rcvbuf,rvsz,& + & brvidx,psb_mpi_c_dpk_,icomm,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + + else if (swap_sync) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + if (proc_to_comm < me) then + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + else if (proc_to_comm > me) then + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send .and. swap_recv) then + + ! First I post all the non blocking receives + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + call psb_get_rank(prcid(i),ictxt,proc_to_comm) + if ((nerv>0).and.(proc_to_comm /= me)) then + p2ptag = psb_dcomplex_swap_tag + call mpi_irecv(rcvbuf(rcv_pt),nerv,& + & psb_mpi_c_dpk_,prcid(i),& + & p2ptag, icomm,rvhd(i),iret) + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + ! Then I post all the blocking sends + if (usersend) call mpi_barrier(icomm,iret) + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_dcomplex_swap_tag + + if ((nesd>0).and.(proc_to_comm /= me)) then + if (usersend) then + call mpi_rsend(sndbuf(snd_pt),nesd,& + & psb_mpi_c_dpk_,prcid(i),& + & p2ptag,icomm,iret) + else + call mpi_send(sndbuf(snd_pt),nesd,& + & psb_mpi_c_dpk_,prcid(i),& + & p2ptag,icomm,iret) + end if + + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + end if + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + + pnti = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + + p2ptag = psb_dcomplex_swap_tag + + if ((proc_to_comm /= me).and.(nerv>0)) then + call mpi_wait(rvhd(i),p2pstat,iret) + if(iret /= mpi_success) then + ierr(1) = iret + info=psb_err_mpi_error_ + call psb_errpush(info,name,i_err=ierr) + goto 9999 + end if + else if (proc_to_comm == me) then + if (nesd /= nerv) then + write(psb_err_unit,*) & + & 'Fatal error in swapdata: mismatch on self send',& + & nerv,nesd + end if + rcvbuf(rcv_pt:rcv_pt+nerv-1) = sndbuf(snd_pt:snd_pt+nesd-1) + end if + pnti = pnti + nerv + nesd + 3 + end do + + + else if (swap_send) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nesd>0) call psb_snd(ictxt,& + & sndbuf(snd_pt:snd_pt+nesd-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + else if (swap_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + if (nerv>0) call psb_rcv(ictxt,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1), proc_to_comm) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (do_recv) then + + pnti = 1 + snd_pt = 1 + rcv_pt = 1 + do i=1, totxch + proc_to_comm = idx%v(pnti+psb_proc_id_) + nerv = idx%v(pnti+psb_n_elem_recv_) + nesd = idx%v(pnti+nerv+psb_n_elem_send_) + idx_pt = 1+pnti+psb_n_elem_recv_ + call y%sct(idx_pt,nerv,idx,& + & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta) + rcv_pt = rcv_pt + nerv + snd_pt = snd_pt + nesd + pnti = pnti + nerv + nesd + 3 + end do + + end if + + if (swap_mpi) then + deallocate(sdsz,rvsz,bsdidx,brvidx,rvhd,prcid,sdhd,& + & stat=info) + else + deallocate(rvhd,prcid,stat=info) + end if + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + if(albf) deallocate(sndbuf,rcvbuf,stat=info) + if(info /= psb_success_) then + call psb_errpush(psb_err_alloc_dealloc_,name) + goto 9999 + end if + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + if (err_act == psb_act_abort_) then + call psb_error(ictxt) + return + end if + return +end subroutine psi_zswap_vidx_vect + diff --git a/base/modules/Makefile b/base/modules/Makefile index f4871876..fb07c6b8 100644 --- a/base/modules/Makefile +++ b/base/modules/Makefile @@ -87,7 +87,7 @@ psi_mod.o: psb_penv_mod.o psb_desc_mod.o psi_serial_mod.o psb_serial_mod.o\ psb_desc_mod.o: psb_penv_mod.o psb_realloc_mod.o\ psb_hash_mod.o psb_hash_map_mod.o psb_list_map_mod.o \ psb_repl_map_mod.o psb_gen_block_map_mod.o psb_desc_const_mod.o\ - psb_indx_map_mod.o + psb_indx_map_mod.o psb_i_vect_mod.o psb_indx_map_mod.o: psb_desc_const_mod.o psb_error_mod.o psb_penv_mod.o psb_hash_map_mod.o psb_list_map_mod.o psb_repl_map_mod.o psb_gen_block_map_mod.o:\ psb_indx_map_mod.o psb_desc_const_mod.o \ diff --git a/base/modules/psb_c_base_vect_mod.f90 b/base/modules/psb_c_base_vect_mod.f90 index 321fd01e..c4428764 100644 --- a/base/modules/psb_c_base_vect_mod.f90 +++ b/base/modules/psb_c_base_vect_mod.f90 @@ -46,6 +46,7 @@ module psb_c_base_vect_mod use psb_const_mod use psb_error_mod + use psb_i_base_vect_mod !> \namespace psb_base_mod \class psb_c_base_vect_type @@ -141,9 +142,11 @@ module psb_c_base_vect_mod ! procedure, pass(x) :: gthab => c_base_gthab procedure, pass(x) :: gthzv => c_base_gthzv - generic, public :: gth => gthab, gthzv + procedure, pass(x) :: gthzv_x => c_base_gthzv_x + generic, public :: gth => gthab, gthzv, gthzv_x procedure, pass(y) :: sctb => c_base_sctb - generic, public :: sct => sctb + procedure, pass(y) :: sctb_x => c_base_sctb_x + generic, public :: sct => sctb, sctb_x end type psb_c_base_vect_type public :: psb_c_base_vect @@ -1042,6 +1045,26 @@ contains call psi_gth(n,idx,alpha,x%v,beta,y) end subroutine c_base_gthab + ! + ! shortcut alpha=1 beta=0 + ! + !> Function base_gthzv + !! \memberof psb_c_base_vect_type + !! \brief gather into an array special alpha=1 beta=0 + !! Y = X(IDX(:)) + !! \param n how many entries to consider + !! \param idx(:) indices + subroutine c_base_gthzv_x(i,n,idx,x,y) + use psi_serial_mod + integer(psb_ipk_) :: i,n + class(psb_i_base_vect_type) :: idx + complex(psb_spk_) :: y(:) + class(psb_c_base_vect_type) :: x + + call x%gth(n,idx%v(i:),y) + + end subroutine c_base_gthzv_x + ! ! shortcut alpha=1 beta=0 ! @@ -1087,4 +1110,15 @@ contains end subroutine c_base_sctb + subroutine c_base_sctb_x(i,n,idx,x,beta,y) + use psi_serial_mod + integer(psb_ipk_) :: i, n + class(psb_i_base_vect_type) :: idx + complex(psb_spk_) :: beta, x(:) + class(psb_c_base_vect_type) :: y + + call y%sct(n,idx%v(i:),x,beta) + + end subroutine c_base_sctb_x + end module psb_c_base_vect_mod diff --git a/base/modules/psb_c_tools_mod.f90 b/base/modules/psb_c_tools_mod.f90 index fbae86fd..b650f160 100644 --- a/base/modules/psb_c_tools_mod.f90 +++ b/base/modules/psb_c_tools_mod.f90 @@ -227,7 +227,7 @@ Module psb_c_tools_mod & psb_c_base_vect_type, psb_c_vect_type, & & psb_cspmat_type, psb_c_base_sparse_mat integer(psb_ipk_), intent(in) :: novr - Type(psb_cspmat_type), Intent(in) :: a + Type(psb_cspmat_type), Intent(inout) :: a Type(psb_desc_type), Intent(in), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info diff --git a/base/modules/psb_cd_tools_mod.f90 b/base/modules/psb_cd_tools_mod.f90 index 6595325e..db1f445a 100644 --- a/base/modules/psb_cd_tools_mod.f90 +++ b/base/modules/psb_cd_tools_mod.f90 @@ -73,9 +73,9 @@ module psb_cd_tools_mod implicit none !....parameters... - type(psb_desc_type), intent(in) :: desc_in - type(psb_desc_type), intent(out) :: desc_out - integer(psb_ipk_), intent(out) :: info + type(psb_desc_type), intent(inout) :: desc_in + type(psb_desc_type), intent(out) :: desc_out + integer(psb_ipk_), intent(out) :: info end subroutine psb_cdcpy end interface @@ -113,7 +113,7 @@ module psb_cd_tools_mod Subroutine psb_cd_lstext(desc_a,in_list,desc_ov,info, mask,extype) import :: psb_ipk_, psb_desc_type Implicit None - Type(psb_desc_type), Intent(in), target :: desc_a + Type(psb_desc_type), Intent(inout), target :: desc_a integer(psb_ipk_), intent(in) :: in_list(:) Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info @@ -144,11 +144,12 @@ module psb_cd_tools_mod end interface interface psb_icdasb - subroutine psb_icdasb(desc,info,ext_hv) - import :: psb_ipk_, psb_desc_type + subroutine psb_icdasb(desc,info,ext_hv, mold) + import :: psb_ipk_, psb_desc_type, psb_i_base_vect_type Type(psb_desc_type), intent(inout) :: desc integer(psb_ipk_), intent(out) :: info logical, intent(in),optional :: ext_hv + type(psb_i_base_vect_type), optional, intent(in) :: mold end subroutine psb_icdasb end interface @@ -200,12 +201,13 @@ contains end subroutine psb_get_boundary - subroutine psb_cdasb(desc,info) + subroutine psb_cdasb(desc,info,mold) Type(psb_desc_type), intent(inout) :: desc - integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), intent(out) :: info + type(psb_i_base_vect_type), optional, intent(in) :: mold - call psb_icdasb(desc,info,ext_hv=.false.) + call psb_icdasb(desc,info,ext_hv=.false.,mold=mold) end subroutine psb_cdasb end module psb_cd_tools_mod diff --git a/base/modules/psb_d_base_vect_mod.f90 b/base/modules/psb_d_base_vect_mod.f90 index d39ab97c..2de0fa2c 100644 --- a/base/modules/psb_d_base_vect_mod.f90 +++ b/base/modules/psb_d_base_vect_mod.f90 @@ -46,6 +46,7 @@ module psb_d_base_vect_mod use psb_const_mod use psb_error_mod + use psb_i_base_vect_mod !> \namespace psb_base_mod \class psb_d_base_vect_type @@ -141,9 +142,11 @@ module psb_d_base_vect_mod ! procedure, pass(x) :: gthab => d_base_gthab procedure, pass(x) :: gthzv => d_base_gthzv - generic, public :: gth => gthab, gthzv + procedure, pass(x) :: gthzv_x => d_base_gthzv_x + generic, public :: gth => gthab, gthzv, gthzv_x procedure, pass(y) :: sctb => d_base_sctb - generic, public :: sct => sctb + procedure, pass(y) :: sctb_x => d_base_sctb_x + generic, public :: sct => sctb, sctb_x end type psb_d_base_vect_type public :: psb_d_base_vect @@ -1042,6 +1045,26 @@ contains call psi_gth(n,idx,alpha,x%v,beta,y) end subroutine d_base_gthab + ! + ! shortcut alpha=1 beta=0 + ! + !> Function base_gthzv + !! \memberof psb_d_base_vect_type + !! \brief gather into an array special alpha=1 beta=0 + !! Y = X(IDX(:)) + !! \param n how many entries to consider + !! \param idx(:) indices + subroutine d_base_gthzv_x(i,n,idx,x,y) + use psi_serial_mod + integer(psb_ipk_) :: i,n + class(psb_i_base_vect_type) :: idx + real(psb_dpk_) :: y(:) + class(psb_d_base_vect_type) :: x + + call x%gth(n,idx%v(i:),y) + + end subroutine d_base_gthzv_x + ! ! shortcut alpha=1 beta=0 ! @@ -1087,4 +1110,15 @@ contains end subroutine d_base_sctb + subroutine d_base_sctb_x(i,n,idx,x,beta,y) + use psi_serial_mod + integer(psb_ipk_) :: i, n + class(psb_i_base_vect_type) :: idx + real(psb_dpk_) :: beta, x(:) + class(psb_d_base_vect_type) :: y + + call y%sct(n,idx%v(i:),x,beta) + + end subroutine d_base_sctb_x + end module psb_d_base_vect_mod diff --git a/base/modules/psb_d_tools_mod.f90 b/base/modules/psb_d_tools_mod.f90 index 3522bbb7..b19e1808 100644 --- a/base/modules/psb_d_tools_mod.f90 +++ b/base/modules/psb_d_tools_mod.f90 @@ -227,7 +227,7 @@ Module psb_d_tools_mod & psb_d_base_vect_type, psb_d_vect_type, & & psb_dspmat_type, psb_d_base_sparse_mat integer(psb_ipk_), intent(in) :: novr - Type(psb_dspmat_type), Intent(in) :: a + Type(psb_dspmat_type), Intent(inout) :: a Type(psb_desc_type), Intent(in), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info diff --git a/base/modules/psb_desc_mod.f90 b/base/modules/psb_desc_mod.f90 index da5fadf7..f8a4f94f 100644 --- a/base/modules/psb_desc_mod.f90 +++ b/base/modules/psb_desc_mod.f90 @@ -37,9 +37,11 @@ module psb_desc_mod use psb_const_mod - use psb_hash_mod use psb_desc_const_mod use psb_indx_map_mod + use psb_i_vect_mod +!!$ +!!$ use psb_hash_mod implicit none @@ -205,6 +207,12 @@ module psb_desc_mod integer(psb_ipk_), allocatable :: ovrlap_elem(:,:) integer(psb_ipk_), allocatable :: ovr_mst_idx(:) integer(psb_ipk_), allocatable :: bnd_elem(:) + + type(psb_i_vect_type) :: v_halo_index + type(psb_i_vect_type) :: v_ext_index + type(psb_i_vect_type) :: v_ovrlap_index + type(psb_i_vect_type) :: v_ovr_mst_idx + class(psb_indx_map), allocatable :: indxmap integer(psb_ipk_), allocatable :: lprm(:) type(psb_desc_type), pointer :: base_desc => null() @@ -224,7 +232,9 @@ module psb_desc_mod procedure, pass(desc) :: get_local_cols => psb_cd_get_local_cols procedure, pass(desc) :: get_global_rows => psb_cd_get_global_rows procedure, pass(desc) :: get_global_cols => psb_cd_get_global_cols - procedure, pass(desc) :: get_list => psb_cd_get_list + procedure, pass(desc) :: a_get_list => psb_cd_get_list + procedure, pass(desc) :: v_get_list => psb_cd_v_get_list + generic, public :: get_list => a_get_list, v_get_list procedure, pass(desc) :: sizeof => psb_cd_sizeof procedure, pass(desc) :: clone => psb_cd_clone procedure, pass(desc) :: free => psb_cdfree @@ -270,6 +280,10 @@ contains if (allocated(desc%lprm)) val = val + psb_sizeof_int*size(desc%lprm) if (allocated(desc%idx_space)) val = val + psb_sizeof_int*size(desc%idx_space) if (allocated(desc%indxmap)) val = val + desc%indxmap%sizeof() + val = val + desc%v_halo_index%sizeof() + val = val + desc%v_ext_index%sizeof() + val = val + desc%v_ovrlap_index%sizeof() + val = val + desc%v_ovr_mst_idx%sizeof() end function psb_cd_sizeof @@ -545,6 +559,19 @@ contains end subroutine psb_get_xch_idx + subroutine psb_get_v_xch_idx(idx,totxch,totsnd,totrcv) + implicit none + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(out) :: totxch,totsnd,totrcv + + integer(psb_ipk_) :: ip, nerv, nesd + character(len=20), parameter :: name='psb_get_v_xch_idx' + + call psb_get_xch_idx(idx%v,totxch,totsnd,totrcv) + + end subroutine psb_get_v_xch_idx + + subroutine psb_cd_get_list(data,desc,ipnt,totxch,idxr,idxs,info) use psb_const_mod @@ -614,6 +641,74 @@ contains return end subroutine psb_cd_get_list + + subroutine psb_cd_v_get_list(data,desc,ipnt,totxch,idxr,idxs,info) + use psb_const_mod + use psb_error_mod + use psb_penv_mod + implicit none + integer(psb_ipk_), intent(in) :: data + class(psb_i_base_vect_type), pointer :: ipnt + class(psb_desc_type), target :: desc + integer(psb_ipk_), intent(out) :: totxch,idxr,idxs,info + + !locals + integer(psb_ipk_) :: np,me,ictxt,err_act, debug_level,debug_unit + logical, parameter :: debug=.false.,debugprt=.false. + character(len=20), parameter :: name='psb_cd_v_get_list' + + info = psb_success_ + call psb_erractionsave(err_act) + debug_unit = psb_get_debug_unit() + debug_level = psb_get_debug_level() + + ictxt = psb_cd_get_context(desc) + + call psb_info(ictxt, me, np) + + select case(data) + case(psb_comm_halo_) + ipnt => desc%v_halo_index%v + case(psb_comm_ovr_) + ipnt => desc%v_ovrlap_index%v + case(psb_comm_ext_) + ipnt => desc%v_ext_index%v + if (debug_level >= psb_debug_ext_) then + if (.not.associated(desc%base_desc)) then + write(debug_unit,*) trim(name),& + & ': Warning: trying to get ext_index on a descriptor ',& + & 'which does not have a base_desc!' + end if + if (.not.psb_is_ovl_desc(desc)) then + write(debug_unit,*) trim(name),& + & ': Warning: trying to get ext_index on a descriptor ',& + & 'which is not overlap-extended!' + end if + end if + case(psb_comm_mov_) + ipnt => desc%v_ovr_mst_idx%v + case default + info=psb_err_from_subroutine_ + call psb_errpush(info,name,a_err='wrong Data selector') + goto 9999 + end select + call psb_get_v_xch_idx(ipnt,totxch,idxs,idxr) + + + call psb_erractionrestore(err_act) + return + +9999 continue + call psb_erractionrestore(err_act) + + if (err_act == psb_act_ret_) then + return + else + call psb_error(ictxt) + end if + return + end subroutine psb_cd_v_get_list + ! ! Subroutine: psb_cdfree ! Frees a descriptor data structure. @@ -957,6 +1052,14 @@ contains & call psb_safe_ab_cpy(desc%idx_space,desc_out%idx_space,info) if ((info == psb_success_).and.(allocated(desc%indxmap))) & & call desc%indxmap%clone(desc_out%indxmap,info) + if (info == psb_success_) & + & call desc%v_halo_index%clone(desc%v_halo_index,info) + if (info == psb_success_) & + & call desc%v_ext_index%clone(desc%v_ext_index,info) + if (info == psb_success_) & + & call desc%v_ovrlap_index%clone(desc%v_ovrlap_index,info) + if (info == psb_success_) & + & call desc%v_ovr_mst_idx%clone(desc%v_ovr_mst_idx,info) else call desc_out%free(info) end if diff --git a/base/modules/psb_s_base_vect_mod.f90 b/base/modules/psb_s_base_vect_mod.f90 index 506a75a2..5ac2078f 100644 --- a/base/modules/psb_s_base_vect_mod.f90 +++ b/base/modules/psb_s_base_vect_mod.f90 @@ -46,6 +46,7 @@ module psb_s_base_vect_mod use psb_const_mod use psb_error_mod + use psb_i_base_vect_mod !> \namespace psb_base_mod \class psb_s_base_vect_type @@ -141,9 +142,11 @@ module psb_s_base_vect_mod ! procedure, pass(x) :: gthab => s_base_gthab procedure, pass(x) :: gthzv => s_base_gthzv - generic, public :: gth => gthab, gthzv + procedure, pass(x) :: gthzv_x => s_base_gthzv_x + generic, public :: gth => gthab, gthzv, gthzv_x procedure, pass(y) :: sctb => s_base_sctb - generic, public :: sct => sctb + procedure, pass(y) :: sctb_x => s_base_sctb_x + generic, public :: sct => sctb, sctb_x end type psb_s_base_vect_type public :: psb_s_base_vect @@ -1042,6 +1045,26 @@ contains call psi_gth(n,idx,alpha,x%v,beta,y) end subroutine s_base_gthab + ! + ! shortcut alpha=1 beta=0 + ! + !> Function base_gthzv + !! \memberof psb_s_base_vect_type + !! \brief gather into an array special alpha=1 beta=0 + !! Y = X(IDX(:)) + !! \param n how many entries to consider + !! \param idx(:) indices + subroutine s_base_gthzv_x(i,n,idx,x,y) + use psi_serial_mod + integer(psb_ipk_) :: i,n + class(psb_i_base_vect_type) :: idx + real(psb_spk_) :: y(:) + class(psb_s_base_vect_type) :: x + + call x%gth(n,idx%v(i:),y) + + end subroutine s_base_gthzv_x + ! ! shortcut alpha=1 beta=0 ! @@ -1087,4 +1110,15 @@ contains end subroutine s_base_sctb + subroutine s_base_sctb_x(i,n,idx,x,beta,y) + use psi_serial_mod + integer(psb_ipk_) :: i, n + class(psb_i_base_vect_type) :: idx + real(psb_spk_) :: beta, x(:) + class(psb_s_base_vect_type) :: y + + call y%sct(n,idx%v(i:),x,beta) + + end subroutine s_base_sctb_x + end module psb_s_base_vect_mod diff --git a/base/modules/psb_s_tools_mod.f90 b/base/modules/psb_s_tools_mod.f90 index 9776f582..908832a8 100644 --- a/base/modules/psb_s_tools_mod.f90 +++ b/base/modules/psb_s_tools_mod.f90 @@ -227,7 +227,7 @@ Module psb_s_tools_mod & psb_s_base_vect_type, psb_s_vect_type, & & psb_sspmat_type, psb_s_base_sparse_mat integer(psb_ipk_), intent(in) :: novr - Type(psb_sspmat_type), Intent(in) :: a + Type(psb_sspmat_type), Intent(inout) :: a Type(psb_desc_type), Intent(in), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info diff --git a/base/modules/psb_z_base_vect_mod.f90 b/base/modules/psb_z_base_vect_mod.f90 index 7b84d47b..0d80b564 100644 --- a/base/modules/psb_z_base_vect_mod.f90 +++ b/base/modules/psb_z_base_vect_mod.f90 @@ -46,6 +46,7 @@ module psb_z_base_vect_mod use psb_const_mod use psb_error_mod + use psb_i_base_vect_mod !> \namespace psb_base_mod \class psb_z_base_vect_type @@ -141,9 +142,11 @@ module psb_z_base_vect_mod ! procedure, pass(x) :: gthab => z_base_gthab procedure, pass(x) :: gthzv => z_base_gthzv - generic, public :: gth => gthab, gthzv + procedure, pass(x) :: gthzv_x => z_base_gthzv_x + generic, public :: gth => gthab, gthzv, gthzv_x procedure, pass(y) :: sctb => z_base_sctb - generic, public :: sct => sctb + procedure, pass(y) :: sctb_x => z_base_sctb_x + generic, public :: sct => sctb, sctb_x end type psb_z_base_vect_type public :: psb_z_base_vect @@ -1042,6 +1045,26 @@ contains call psi_gth(n,idx,alpha,x%v,beta,y) end subroutine z_base_gthab + ! + ! shortcut alpha=1 beta=0 + ! + !> Function base_gthzv + !! \memberof psb_z_base_vect_type + !! \brief gather into an array special alpha=1 beta=0 + !! Y = X(IDX(:)) + !! \param n how many entries to consider + !! \param idx(:) indices + subroutine z_base_gthzv_x(i,n,idx,x,y) + use psi_serial_mod + integer(psb_ipk_) :: i,n + class(psb_i_base_vect_type) :: idx + complex(psb_dpk_) :: y(:) + class(psb_z_base_vect_type) :: x + + call x%gth(n,idx%v(i:),y) + + end subroutine z_base_gthzv_x + ! ! shortcut alpha=1 beta=0 ! @@ -1087,4 +1110,15 @@ contains end subroutine z_base_sctb + subroutine z_base_sctb_x(i,n,idx,x,beta,y) + use psi_serial_mod + integer(psb_ipk_) :: i, n + class(psb_i_base_vect_type) :: idx + complex(psb_dpk_) :: beta, x(:) + class(psb_z_base_vect_type) :: y + + call y%sct(n,idx%v(i:),x,beta) + + end subroutine z_base_sctb_x + end module psb_z_base_vect_mod diff --git a/base/modules/psb_z_tools_mod.f90 b/base/modules/psb_z_tools_mod.f90 index c9eb30a2..72091a9c 100644 --- a/base/modules/psb_z_tools_mod.f90 +++ b/base/modules/psb_z_tools_mod.f90 @@ -227,7 +227,7 @@ Module psb_z_tools_mod & psb_z_base_vect_type, psb_z_vect_type, & & psb_zspmat_type, psb_z_base_sparse_mat integer(psb_ipk_), intent(in) :: novr - Type(psb_zspmat_type), Intent(in) :: a + Type(psb_zspmat_type), Intent(inout) :: a Type(psb_desc_type), Intent(in), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info diff --git a/base/modules/psi_c_mod.f90 b/base/modules/psi_c_mod.f90 index 3ee17b5e..1e6d8e26 100644 --- a/base/modules/psi_c_mod.f90 +++ b/base/modules/psi_c_mod.f90 @@ -30,7 +30,7 @@ !!$ !!$ module psi_c_mod - use psb_desc_mod, only : psb_desc_type, psb_spk_, psb_ipk_ + use psb_desc_mod, only : psb_desc_type, psb_spk_, psb_ipk_, psb_i_base_vect_type use psb_c_vect_mod, only : psb_c_base_vect_type interface psi_swapdata @@ -90,6 +90,16 @@ module psi_c_mod complex(psb_spk_),target :: work(:) integer(psb_ipk_), intent(in) :: idx(:),totxch,totsnd,totrcv end subroutine psi_cswapidx_vect + subroutine psi_cswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + import :: psb_desc_type, psb_ipk_, psb_spk_, psb_c_base_vect_type, psb_i_base_vect_type + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_c_base_vect_type) :: y + complex(psb_spk_) :: beta + complex(psb_spk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + end subroutine psi_cswap_vidx_vect end interface diff --git a/base/modules/psi_d_mod.f90 b/base/modules/psi_d_mod.f90 index 05f45a3c..e4e88d48 100644 --- a/base/modules/psi_d_mod.f90 +++ b/base/modules/psi_d_mod.f90 @@ -30,8 +30,9 @@ !!$ !!$ module psi_d_mod - use psb_desc_mod, only : psb_desc_type, psb_ipk_, psb_dpk_ + use psb_desc_mod, only : psb_desc_type, psb_ipk_, psb_dpk_, psb_i_base_vect_type use psb_d_vect_mod, only : psb_d_base_vect_type + interface psi_swapdata subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info,data) import :: psb_desc_type, psb_ipk_, psb_dpk_, psb_d_base_vect_type @@ -89,6 +90,16 @@ module psi_d_mod real(psb_dpk_),target :: work(:) integer(psb_ipk_), intent(in) :: idx(:),totxch,totsnd,totrcv end subroutine psi_dswapidx_vect + subroutine psi_dswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + import :: psb_desc_type, psb_ipk_, psb_dpk_, psb_d_base_vect_type, psb_i_base_vect_type + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_d_base_vect_type) :: y + real(psb_dpk_) :: beta + real(psb_dpk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + end subroutine psi_dswap_vidx_vect end interface diff --git a/base/modules/psi_i_mod.f90 b/base/modules/psi_i_mod.f90 index a62573cb..af8f9b33 100644 --- a/base/modules/psi_i_mod.f90 +++ b/base/modules/psi_i_mod.f90 @@ -225,11 +225,12 @@ module psi_i_mod end interface interface psi_cnv_dsc - subroutine psi_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info) - import :: psb_desc_type, psb_ipk_ + subroutine psi_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) + import :: psb_desc_type, psb_ipk_, psb_i_base_vect_type integer(psb_ipk_), intent(in) :: halo_in(:), ovrlap_in(:),ext_in(:) type(psb_desc_type), intent(inout) :: cdesc integer(psb_ipk_), intent(out) :: info + class(psb_i_base_vect_type), optional, intent(in) :: mold end subroutine psi_cnv_dsc end interface diff --git a/base/modules/psi_s_mod.f90 b/base/modules/psi_s_mod.f90 index f293b169..05b73234 100644 --- a/base/modules/psi_s_mod.f90 +++ b/base/modules/psi_s_mod.f90 @@ -30,7 +30,7 @@ !!$ !!$ module psi_s_mod - use psb_desc_mod, only : psb_desc_type, psb_ipk_, psb_spk_ + use psb_desc_mod, only : psb_desc_type, psb_ipk_, psb_spk_, psb_i_base_vect_type use psb_s_vect_mod, only : psb_s_base_vect_type interface psi_swapdata @@ -90,6 +90,16 @@ module psi_s_mod real(psb_spk_),target :: work(:) integer(psb_ipk_), intent(in) :: idx(:),totxch,totsnd,totrcv end subroutine psi_sswapidx_vect + subroutine psi_sswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + import :: psb_desc_type, psb_ipk_, psb_spk_, psb_s_base_vect_type, psb_i_base_vect_type + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_s_base_vect_type) :: y + real(psb_spk_) :: beta + real(psb_spk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + end subroutine psi_sswap_vidx_vect end interface diff --git a/base/modules/psi_z_mod.f90 b/base/modules/psi_z_mod.f90 index 3dc1c3d0..91d81423 100644 --- a/base/modules/psi_z_mod.f90 +++ b/base/modules/psi_z_mod.f90 @@ -30,7 +30,7 @@ !!$ !!$ module psi_z_mod - use psb_desc_mod, only : psb_desc_type, psb_ipk_, psb_dpk_ + use psb_desc_mod, only : psb_desc_type, psb_ipk_, psb_dpk_, psb_i_base_vect_type use psb_z_vect_mod, only : psb_z_base_vect_type interface psi_swapdata @@ -90,6 +90,16 @@ module psi_z_mod complex(psb_dpk_),target :: work(:) integer(psb_ipk_), intent(in) :: idx(:),totxch,totsnd,totrcv end subroutine psi_zswapidx_vect + subroutine psi_zswap_vidx_vect(iictxt,iicomm,flag,beta,y,idx,totxch,totsnd,totrcv,work,info) + import :: psb_desc_type, psb_ipk_, psb_dpk_, psb_z_base_vect_type, psb_i_base_vect_type + integer(psb_ipk_), intent(in) :: iictxt,iicomm,flag + integer(psb_ipk_), intent(out) :: info + class(psb_z_base_vect_type) :: y + complex(psb_dpk_) :: beta + complex(psb_dpk_), target :: work(:) + class(psb_i_base_vect_type), intent(in) :: idx + integer(psb_ipk_), intent(in) :: totxch,totsnd, totrcv + end subroutine psi_zswap_vidx_vect end interface diff --git a/base/tools/psb_ccdbldext.F90 b/base/tools/psb_ccdbldext.F90 index ae92255b..9974a7a5 100644 --- a/base/tools/psb_ccdbldext.F90 +++ b/base/tools/psb_ccdbldext.F90 @@ -75,7 +75,7 @@ Subroutine psb_ccdbldext(a,desc_a,novr,desc_ov,info, extype) ! .. Array Arguments .. integer(psb_ipk_), intent(in) :: novr Type(psb_cspmat_type), Intent(in) :: a - Type(psb_desc_type), Intent(in), target :: desc_a + Type(psb_desc_type), Intent(inout), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info integer(psb_ipk_), intent(in),optional :: extype diff --git a/base/tools/psb_cd_lstext.f90 b/base/tools/psb_cd_lstext.f90 index 3d26a329..3cf99e78 100644 --- a/base/tools/psb_cd_lstext.f90 +++ b/base/tools/psb_cd_lstext.f90 @@ -37,7 +37,7 @@ Subroutine psb_cd_lstext(desc_a,in_list,desc_ov,info, mask,extype) Implicit None ! .. Array Arguments .. - Type(psb_desc_type), Intent(in), target :: desc_a + Type(psb_desc_type), Intent(inout), target :: desc_a integer(psb_ipk_), intent(in) :: in_list(:) Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info diff --git a/base/tools/psb_cdcpy.F90 b/base/tools/psb_cdcpy.F90 index 8f5dee72..3112139a 100644 --- a/base/tools/psb_cdcpy.F90 +++ b/base/tools/psb_cdcpy.F90 @@ -44,9 +44,9 @@ subroutine psb_cdcpy(desc_in, desc_out, info) implicit none !....parameters... - type(psb_desc_type), intent(in) :: desc_in - type(psb_desc_type), intent(out) :: desc_out - integer(psb_ipk_), intent(out) :: info + type(psb_desc_type), intent(inout) :: desc_in + type(psb_desc_type), intent(out) :: desc_out + integer(psb_ipk_), intent(out) :: info !locals integer(psb_ipk_) :: np,me,ictxt, err_act @@ -73,27 +73,7 @@ subroutine psb_cdcpy(desc_in, desc_out, info) goto 9999 endif -!!$ call psb_safe_ab_cpy(desc_in%matrix_data,desc_out%matrix_data,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%halo_index,desc_out%halo_index,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%ext_index,desc_out%ext_index,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%ovrlap_index,& - & desc_out%ovrlap_index,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%bnd_elem,desc_out%bnd_elem,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%ovrlap_elem,desc_out%ovrlap_elem,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%ovr_mst_idx,desc_out%ovr_mst_idx,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%lprm,desc_out%lprm,info) - if (info == psb_success_) call psb_safe_ab_cpy(desc_in%idx_space,desc_out%idx_space,info) - - if (allocated(desc_in%indxmap)) then - -#ifdef SOURCE_WORKAROUND - call desc_in%indxmap%clone(desc_out%indxmap,info) -#else - if (info == psb_success_)& - & allocate(desc_out%indxmap, source=desc_in%indxmap, stat=info) -#endif - - end if + call desc_in%clone(desc_out,info) if (info /= psb_success_) then info = psb_err_from_subroutine_ diff --git a/base/tools/psb_csphalo.F90 b/base/tools/psb_csphalo.F90 index 054e80ca..6dcb741e 100644 --- a/base/tools/psb_csphalo.F90 +++ b/base/tools/psb_csphalo.F90 @@ -274,8 +274,8 @@ Subroutine psb_csphalo(a,desc_a,blk,info,rowcnv,colcnv,& end if - call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_complex,& - & acoo%val,rvsz,brvindx,mpi_complex,icomm,minfo) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_spk_,icomm,minfo) call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& diff --git a/base/tools/psb_dcdbldext.F90 b/base/tools/psb_dcdbldext.F90 index d4420b15..497db424 100644 --- a/base/tools/psb_dcdbldext.F90 +++ b/base/tools/psb_dcdbldext.F90 @@ -75,7 +75,7 @@ Subroutine psb_dcdbldext(a,desc_a,novr,desc_ov,info, extype) ! .. Array Arguments .. integer(psb_ipk_), intent(in) :: novr Type(psb_dspmat_type), Intent(in) :: a - Type(psb_desc_type), Intent(in), target :: desc_a + Type(psb_desc_type), Intent(inout), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info integer(psb_ipk_), intent(in),optional :: extype diff --git a/base/tools/psb_dsphalo.F90 b/base/tools/psb_dsphalo.F90 index 7193d0ee..1d31d078 100644 --- a/base/tools/psb_dsphalo.F90 +++ b/base/tools/psb_dsphalo.F90 @@ -274,8 +274,8 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,& end if - call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_precision,& - & acoo%val,rvsz,brvindx,mpi_double_precision,icomm,minfo) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_dpk_,icomm,minfo) call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& diff --git a/base/tools/psb_icdasb.F90 b/base/tools/psb_icdasb.F90 index 59cc7883..7d46e322 100644 --- a/base/tools/psb_icdasb.F90 +++ b/base/tools/psb_icdasb.F90 @@ -42,7 +42,7 @@ ! coming from the build of an extended ! halo descriptor with respect to a normal call. ! -subroutine psb_icdasb(desc,info,ext_hv) +subroutine psb_icdasb(desc,info,ext_hv,mold) use psb_base_mod, psb_protect_name => psb_icdasb use psi_mod #ifdef MPI_MOD @@ -56,6 +56,7 @@ subroutine psb_icdasb(desc,info,ext_hv) type(psb_desc_type), intent(inout) :: desc integer(psb_ipk_), intent(out) :: info logical, intent(in), optional :: ext_hv + type(psb_i_base_vect_type), optional, intent(in) :: mold !....Locals.... integer(psb_ipk_) :: int_err(5) @@ -128,7 +129,7 @@ subroutine psb_icdasb(desc,info,ext_hv) if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': Final conversion' ! Then convert and put them back where they belong. - call psi_cnv_dsc(halo_index,ovrlap_index,ext_index,desc,info) + call psi_cnv_dsc(halo_index,ovrlap_index,ext_index,desc,info,mold=mold) if (info /= psb_success_) then call psb_errpush(psb_err_from_subroutine_,name,a_err='psi_cnv_dsc') diff --git a/base/tools/psb_scdbldext.F90 b/base/tools/psb_scdbldext.F90 index 3880f507..9ecdafca 100644 --- a/base/tools/psb_scdbldext.F90 +++ b/base/tools/psb_scdbldext.F90 @@ -75,7 +75,7 @@ Subroutine psb_scdbldext(a,desc_a,novr,desc_ov,info, extype) ! .. Array Arguments .. integer(psb_ipk_), intent(in) :: novr Type(psb_sspmat_type), Intent(in) :: a - Type(psb_desc_type), Intent(in), target :: desc_a + Type(psb_desc_type), Intent(inout), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info integer(psb_ipk_), intent(in),optional :: extype diff --git a/base/tools/psb_ssphalo.F90 b/base/tools/psb_ssphalo.F90 index 780887bc..4fb6b910 100644 --- a/base/tools/psb_ssphalo.F90 +++ b/base/tools/psb_ssphalo.F90 @@ -274,8 +274,8 @@ Subroutine psb_ssphalo(a,desc_a,blk,info,rowcnv,colcnv,& end if - call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_real,& - & acoo%val,rvsz,brvindx,mpi_real,icomm,minfo) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_r_spk_,& + & acoo%val,rvsz,brvindx,psb_mpi_r_spk_,icomm,minfo) call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& diff --git a/base/tools/psb_zcdbldext.F90 b/base/tools/psb_zcdbldext.F90 index 207cff28..9eec49f6 100644 --- a/base/tools/psb_zcdbldext.F90 +++ b/base/tools/psb_zcdbldext.F90 @@ -75,7 +75,7 @@ Subroutine psb_zcdbldext(a,desc_a,novr,desc_ov,info, extype) ! .. Array Arguments .. integer(psb_ipk_), intent(in) :: novr Type(psb_zspmat_type), Intent(in) :: a - Type(psb_desc_type), Intent(in), target :: desc_a + Type(psb_desc_type), Intent(inout), target :: desc_a Type(psb_desc_type), Intent(out) :: desc_ov integer(psb_ipk_), intent(out) :: info integer(psb_ipk_), intent(in),optional :: extype diff --git a/base/tools/psb_zsphalo.F90 b/base/tools/psb_zsphalo.F90 index 467ecec4..dbd9c512 100644 --- a/base/tools/psb_zsphalo.F90 +++ b/base/tools/psb_zsphalo.F90 @@ -274,8 +274,8 @@ Subroutine psb_zsphalo(a,desc_a,blk,info,rowcnv,colcnv,& end if - call mpi_alltoallv(valsnd,sdsz,bsdindx,mpi_double_complex,& - & acoo%val,rvsz,brvindx,mpi_double_complex,icomm,minfo) + call mpi_alltoallv(valsnd,sdsz,bsdindx,psb_mpi_c_dpk_,& + & acoo%val,rvsz,brvindx,psb_mpi_c_dpk_,icomm,minfo) call mpi_alltoallv(iasnd,sdsz,bsdindx,psb_mpi_ipk_integer,& & acoo%ia,rvsz,brvindx,psb_mpi_ipk_integer,icomm,minfo) call mpi_alltoallv(jasnd,sdsz,bsdindx,psb_mpi_ipk_integer,&