diff --git a/base/internals/psi_adjcncy_fnd_owner.F90 b/base/internals/psi_adjcncy_fnd_owner.F90 index ed244554..f26bb38f 100644 --- a/base/internals/psi_adjcncy_fnd_owner.F90 +++ b/base/internals/psi_adjcncy_fnd_owner.F90 @@ -59,6 +59,7 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) use psb_error_mod use psb_penv_mod use psb_realloc_mod + use psb_timers_mod use psb_indx_map_mod, psb_protect_name => psi_adjcncy_fnd_owner #ifdef MPI_MOD use mpi @@ -85,10 +86,11 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) & last_ih, last_j, nidx, nrecv, nadj integer(psb_lpk_) :: mglob, ih integer(psb_ipk_) :: ictxt,np,me - logical, parameter :: gettime=.false., debug=.false. - logical, parameter :: a2av_impl=.true. - logical, parameter :: mpi_irecv_impl=.false. - logical, parameter :: psb_rcv_impl=.false. + logical, parameter :: gettime=.true., debug=.false. + integer(psb_mpk_) :: xchg_alg + logical, parameter :: do_timings=.false. + integer(psb_ipk_), save :: idx_phase1=-1, idx_phase2=-1, idx_phase3=-1 + integer(psb_ipk_), save :: idx_phase11=-1, idx_phase12=-1, idx_phase13=-1 real(psb_dpk_) :: t0, t1, t2, t3, t4, tamx, tidx character(len=20) :: name @@ -102,6 +104,19 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) n_row = idxmap%get_lr() n_col = idxmap%get_lc() iictxt = ictxt + if ((do_timings).and.(idx_phase1==-1)) & + & idx_phase1 = psb_get_timer_idx("ADJ_FND_OWN: phase1 ") + if ((do_timings).and.(idx_phase2==-1)) & + & idx_phase2 = psb_get_timer_idx("ADJ_FND_OWN: phase2") + if ((do_timings).and.(idx_phase3==-1)) & + & idx_phase3 = psb_get_timer_idx("ADJ_FND_OWN: phase3") + if ((do_timings).and.(idx_phase11==-1)) & + & idx_phase11 = psb_get_timer_idx("ADJ_FND_OWN: phase11 ") + if ((do_timings).and.(idx_phase12==-1)) & + & idx_phase12 = psb_get_timer_idx("ADJ_FND_OWN: phase12") + if ((do_timings).and.(idx_phase13==-1)) & + & idx_phase13 = psb_get_timer_idx("ADJ_FND_OWN: phase13") + call psb_info(ictxt, me, np) @@ -129,8 +144,11 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) end if iprc = -1 ! write(0,*) me,name,' Going through ',nidx,nadj + xchg_alg = psi_get_adj_alg() + select case(xchg_alg) + case(psi_adj_fnd_a2av_) + if (do_timings) call psb_tic(idx_phase1) - if (a2av_impl) then ! ! First simple minded version with auxiliary arrays ! dimensioned on NP. @@ -146,6 +164,7 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) ! ! First, send sizes according to adjcncy list ! + if (do_timings) call psb_tic(idx_phase11) sdsz = 0 do j=1, nadj sdsz(adj(j)) = nidx @@ -154,15 +173,20 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) call mpi_alltoall(sdsz,1,psb_mpi_mpk_,& & rvsz,1,psb_mpi_mpk_,icomm,minfo) + if (do_timings) call psb_toc(idx_phase11) + if (do_timings) call psb_tic(idx_phase12) rvidx(0) = 0 do i=0, np-1 rvidx(i+1) = rvidx(i) + rvsz(i) end do hsize = rvidx(np) + ! write(0,*)me,' Check on sizes from a2a:',hsize,rvsz(:) ! ! Second, allocate buffers and exchange data ! + if (do_timings) call psb_toc(idx_phase12) + if (do_timings) call psb_tic(idx_phase13) Allocate(rmtidx(hsize),lclidx(max(hsize,nidx*nadj)),& & tproc(max(hsize,nidx)),stat=info) @@ -173,7 +197,9 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) call mpi_alltoallv(idx,sdsz,sdidx,psb_mpi_lpk_,& & rmtidx,rvsz,rvidx,psb_mpi_lpk_,icomm,iret) - + if (do_timings) call psb_toc(idx_phase13) + if (do_timings) call psb_toc(idx_phase1) + if (do_timings) call psb_tic(idx_phase2) ! ! Third, compute local answers ! @@ -182,6 +208,8 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) tproc(i) = -1 if ((0 < lclidx(i)).and. (lclidx(i) <= n_row)) tproc(i) = me end do + if (do_timings) call psb_toc(idx_phase2) + if (do_timings) call psb_tic(idx_phase3) ! ! Fourth, exchange the answers @@ -203,10 +231,13 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) iprc(1:nidx) = max(iprc(1:nidx), lclidx(sdidx(i)+1:sdidx(i)+sdsz(i))) end if end do + if (do_timings) call psb_toc(idx_phase3) if (debug) write(0,*) me,' End of adjcncy_fnd ',iprc(1:nidx) - else if (mpi_irecv_impl) then + case(psi_adj_fnd_irecv_) + + if (do_timings) call psb_tic(idx_phase1) ! ! First simple minded version with auxiliary arrays ! dimensioned on NP. @@ -216,6 +247,7 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) Allocate(hidx(0:np),hsz(np),sdsz(0:np-1),rvsz(0:np-1),& & sdhd(0:np-1), rvhd(0:np-1), p2pstat(mpi_status_size,0:np-1),& & stat=info) + if (do_timings) call psb_tic(idx_phase11) sdhd(:) = mpi_request_null rvhd(:) = mpi_request_null ! @@ -255,6 +287,8 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) & p2ptag, icomm,rvhd(i),iret) end if end do + if (do_timings) call psb_toc(idx_phase11) + if (do_timings) call psb_tic(idx_phase12) do j=1, nadj if (nidx > 0) then prc = psb_get_mpi_rank(ictxt,adj(j)) @@ -265,7 +299,12 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) & p2ptag, icomm,iret) end if end do + if (do_timings) call psb_toc(idx_phase12) + if (do_timings) call psb_tic(idx_phase13) call mpi_waitall(np,rvhd,p2pstat,iret) + if (do_timings) call psb_toc(idx_phase13) + if (do_timings) call psb_toc(idx_phase1) + if (do_timings) call psb_tic(idx_phase2) ! ! Third, compute local answers @@ -275,6 +314,8 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) tproc(i) = -1 if ((0 < lclidx(i)).and. (lclidx(i) <= n_row)) tproc(i) = me end do + if (do_timings) call psb_toc(idx_phase2) + if (do_timings) call psb_tic(idx_phase3) ! ! At this point we can reuse lclidx to receive messages ! @@ -312,9 +353,10 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) do j = 1, nadj iprc(1:nidx) = max(iprc(1:nidx), lclidx((j-1)*nidx+1:(j-1)*nidx+nidx)) end do + if (do_timings) call psb_toc(idx_phase3) if (debug) write(0,*) me,' End of adjcncy_fnd ',iprc(1:nidx) - else if (psb_rcv_impl) then + case(psi_adj_fnd_pbrcv_) Allocate(hidx(0:np),hsz(np),& & sdsz(0:np-1),rvsz(0:np-1),stat=info) @@ -382,11 +424,11 @@ subroutine psi_adjcncy_fnd_owner(idx,iprc,adj,idxmap,info) if (nidx > 0) call psb_rcv(ictxt,tproc(1:nidx),adj(j)) iprc(1:nidx) = max(iprc(1:nidx), tproc(1:nidx)) end do - else + case default info = psb_err_internal_error_ call psb_errpush(info,name,a_err='invalid exchange alg choice') goto 9999 - end if + end select call psb_erractionrestore(err_act) return diff --git a/base/internals/psi_crea_index.f90 b/base/internals/psi_crea_index.f90 index ec783899..126d54e2 100644 --- a/base/internals/psi_crea_index.f90 +++ b/base/internals/psi_crea_index.f90 @@ -54,6 +54,7 @@ subroutine psi_i_crea_index(desc_a,index_in,index_out,nxch,nsnd,nrcv,info) use psb_desc_mod use psb_error_mod use psb_penv_mod + use psb_timers_mod use psi_mod, psb_protect_name => psi_i_crea_index implicit none @@ -69,6 +70,9 @@ subroutine psi_i_crea_index(desc_a,index_in,index_out,nxch,nsnd,nrcv,info) integer(psb_ipk_),parameter :: root=psb_root_,no_comm=-1 integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name + logical, parameter :: do_timings=.false. + integer(psb_ipk_), save :: idx_phase1=-1, idx_phase2=-1, idx_phase3=-1 + integer(psb_ipk_), save :: idx_phase11=-1, idx_phase12=-1, idx_phase13=-1 info = psb_success_ name='psi_crea_index' @@ -84,12 +88,26 @@ subroutine psi_i_crea_index(desc_a,index_in,index_out,nxch,nsnd,nrcv,info) call psb_errpush(info,name) goto 9999 endif + if ((do_timings).and.(idx_phase1==-1)) & + & idx_phase1 = psb_get_timer_idx("PSI_CREA_INDEX: phase1 ") + if ((do_timings).and.(idx_phase2==-1)) & + & idx_phase2 = psb_get_timer_idx("PSI_CREA_INDEX: phase2") + if ((do_timings).and.(idx_phase3==-1)) & + & idx_phase3 = psb_get_timer_idx("PSI_CREA_INDEX: phase3") +!!$ if ((do_timings).and.(idx_phase11==-1)) & +!!$ & idx_phase11 = psb_get_timer_idx("PSI_CREA_INDEX: phase11 ") +!!$ if ((do_timings).and.(idx_phase12==-1)) & +!!$ & idx_phase12 = psb_get_timer_idx("PSI_CREA_INDEX: phase12") +!!$ if ((do_timings).and.(idx_phase13==-1)) & +!!$ & idx_phase13 = psb_get_timer_idx("PSI_CREA_INDEX: phase13") + ! ...extract dependence list (ordered list of identifer process ! which every process must communcate with... if (debug_level >= psb_debug_inner_) & & write(debug_unit,*) me,' ',trim(name),': calling extract_dep_list' mode = 1 + if (do_timings) call psb_tic(idx_phase1) call psi_extract_dep_list(ictxt,& & desc_a%is_bld(), desc_a%is_upd(),& @@ -105,6 +123,8 @@ subroutine psi_i_crea_index(desc_a,index_in,index_out,nxch,nsnd,nrcv,info) ! ...now process root contains dependence list of all processes... if (debug_level >= psb_debug_inner_) & & write(debug_unit,*) me,' ',trim(name),': root sorting dep list' + if (do_timings) call psb_toc(idx_phase1) + if (do_timings) call psb_tic(idx_phase2) call psi_dl_check(dep_list,dl_lda,np,length_dl) @@ -114,6 +134,8 @@ subroutine psi_i_crea_index(desc_a,index_in,index_out,nxch,nsnd,nrcv,info) call psb_errpush(psb_err_from_subroutine_,name,a_err='psi_sort_dl') goto 9999 end if + if (do_timings) call psb_toc(idx_phase2) + if (do_timings) call psb_tic(idx_phase3) if(debug_level >= psb_debug_inner_)& & write(debug_unit,*) me,' ',trim(name),': calling psi_desc_index' @@ -128,6 +150,7 @@ subroutine psi_i_crea_index(desc_a,index_in,index_out,nxch,nsnd,nrcv,info) call psb_errpush(psb_err_from_subroutine_,name,a_err='psi_desc_index') goto 9999 end if + if (do_timings) call psb_toc(idx_phase3) deallocate(dep_list,length_dl) if(debug_level >= psb_debug_inner_) & diff --git a/base/internals/psi_desc_impl.f90 b/base/internals/psi_desc_impl.f90 index b552a8ad..43eae7bb 100644 --- a/base/internals/psi_desc_impl.f90 +++ b/base/internals/psi_desc_impl.f90 @@ -63,6 +63,7 @@ end subroutine psi_renum_index subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) use psi_mod, psi_protect_name => psi_i_cnv_dsc + use psb_timers_mod use psb_realloc_mod implicit none @@ -82,6 +83,9 @@ subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) integer(psb_ipk_) :: debug_level, debug_unit logical, parameter :: debug=.false. character(len=20) :: name + logical, parameter :: do_timings=.false. + integer(psb_ipk_), save :: idx_phase1=-1, idx_phase2=-1, idx_phase3=-1 + integer(psb_ipk_), save :: idx_phase11=-1, idx_phase12=-1, idx_phase13=-1 name='psi_cnv_desc' call psb_get_erraction(err_act) @@ -97,7 +101,22 @@ subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) call psb_errpush(info,name) goto 9999 endif - + if ((do_timings).and.(idx_phase1==-1)) & + & idx_phase1 = psb_get_timer_idx("PSI_CNV_DSC: phase1 ") + if ((do_timings).and.(idx_phase2==-1)) & + & idx_phase2 = psb_get_timer_idx("PSI_CNV_DSC: phase2") + if ((do_timings).and.(idx_phase3==-1)) & + & idx_phase3 = psb_get_timer_idx("PSI_CNV_DSC: phase3") + if ((do_timings).and.(idx_phase11==-1)) & + & idx_phase11 = psb_get_timer_idx("PSI_CNV_DSC: phase11 ") + if ((do_timings).and.(idx_phase12==-1)) & + & idx_phase12 = psb_get_timer_idx("PSI_CNV_DSC: phase12") + if ((do_timings).and.(idx_phase13==-1)) & + & idx_phase13 = psb_get_timer_idx("PSI_CNV_DSC: phase13") + + + if (do_timings) call psb_tic(idx_phase1) + if (do_timings) call psb_tic(idx_phase11) ! first the halo index if (debug_level>0) write(debug_unit,*) me,'Calling crea_index on halo',& @@ -111,6 +130,8 @@ subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) if (debug_level>0) write(debug_unit,*) me,'Done crea_index on halo' if (debug_level>0) write(debug_unit,*) me,'Calling crea_index on ext' + if (do_timings) call psb_toc(idx_phase11) + if (do_timings) call psb_tic(idx_phase12) ! then ext index @@ -124,6 +145,8 @@ subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) if (debug_level>0) write(debug_unit,*) me,'Done crea_index on ext' if (debug_level>0) write(debug_unit,*) me,'Calling crea_index on ovrlap' + if (do_timings) call psb_toc(idx_phase12) + if (do_timings) call psb_tic(idx_phase13) ! then the overlap index call psi_crea_index(cdesc,ovrlap_in, idx_out,nxch,nsnd,nrcv,info) @@ -136,6 +159,9 @@ subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) call psb_errpush(psb_err_from_subroutine_,name,a_err='psb_move_alloc') goto 9999 end if + if (do_timings) call psb_toc(idx_phase13) + if (do_timings) call psb_toc(idx_phase1) + if (do_timings) call psb_tic(idx_phase2) ! next ovrlap_elem @@ -161,6 +187,8 @@ subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) call psb_errpush(psb_err_from_subroutine_,name,a_err='psb_move_alloc') goto 9999 end if + if (do_timings) call psb_toc(idx_phase2) + if (do_timings) call psb_tic(idx_phase3) ! finally bnd_elem call psi_crea_bnd_elem(idx_out,cdesc,info) @@ -177,7 +205,8 @@ subroutine psi_i_cnv_dsc(halo_in,ovrlap_in,ext_in,cdesc, info, mold) goto 9999 end if if (debug_level>0) write(debug_unit,*) me,'Done crea_bnd_elem' - + if (do_timings) call psb_toc(idx_phase3) + call psb_erractionrestore(err_act) return diff --git a/base/internals/psi_desc_index.F90 b/base/internals/psi_desc_index.F90 index 7f36c6ea..f23662f1 100644 --- a/base/internals/psi_desc_index.F90 +++ b/base/internals/psi_desc_index.F90 @@ -128,11 +128,15 @@ subroutine psi_i_desc_index(desc,index_in,dep_list,& integer(psb_mpk_),allocatable :: brvindx(:),rvsz(:),& & bsdindx(:),sdsz(:) + integer(psb_mpk_) :: proc_to_comm, p2ptag, p2pstat(mpi_status_size),& + & iret, sz + integer(psb_mpk_), allocatable :: prcid(:), rvhd(:) + integer(psb_ipk_) :: ihinsz,ntot,k,err_act,nidx,& - & idxr, idxs, iszs, iszr, nesd, nerv + & idxr, idxs, iszs, iszr, nesd, nerv, ixp, idx integer(psb_mpk_) :: icomm, minfo - logical,parameter :: usempi=.true. + logical, parameter :: usempi=.false. integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name @@ -255,12 +259,12 @@ subroutine psi_i_desc_index(desc,index_in,dep_list,& call desc%indxmap%l2g(index_in(i+1:i+nerv),& & sndbuf(bsdindx(proc+1)+1:bsdindx(proc+1)+nerv),& & info) - + if (info /= psb_success_) then call psb_errpush(psb_err_from_subroutine_,name,a_err='l2g') goto 9999 end if - + bsdindx(proc+1) = bsdindx(proc+1) + nerv i = i + nerv + 1 end do @@ -282,11 +286,76 @@ subroutine psi_i_desc_index(desc,index_in,dep_list,& idxr = idxr + rvsz(proc+1) end do - call mpi_alltoallv(sndbuf,sdsz,bsdindx,psb_mpi_lpk_,& - & rcvbuf,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) - if (minfo /= psb_success_) then - call psb_errpush(psb_err_from_subroutine_,name,a_err='mpi_alltoallv') - goto 9999 + if (usempi) then + call mpi_alltoallv(sndbuf,sdsz,bsdindx,psb_mpi_lpk_,& + & rcvbuf,rvsz,brvindx,psb_mpi_lpk_,icomm,minfo) + if (minfo /= psb_success_) then + call psb_errpush(psb_err_from_subroutine_,name,a_err='mpi_alltoallv') + goto 9999 + end if + else + if (.true.) then + allocate(prcid(length_dl),rvhd(length_dl)) + prcid = -1 + ixp = 1 + do i=1, length_dl + proc = dep_list(i) + prcid(ixp) = psb_get_mpi_rank(ictxt,proc) + sz = rvsz(proc+1) + if (sz > 0) then + p2ptag = psb_long_tag + idx = brvindx(proc+1) + call mpi_irecv(rcvbuf(idx+1:idx+sz),sz,& + & psb_mpi_lpk_, prcid(ixp), p2ptag, icomm,& + & rvhd(ixp),iret) + end if + ixp = ixp + 1 + end do + ixp = 1 + do i=1, length_dl + proc = dep_list(i) + prcid(ixp) = psb_get_mpi_rank(ictxt,proc) + sz = sdsz(proc+1) + if (sz > 0) then + p2ptag = psb_long_tag + idx = bsdindx(proc+1) + call mpi_send(sndbuf(idx+1:idx+sz),sz,& + & psb_mpi_lpk_, prcid(ixp), p2ptag, & + & icomm,iret) + end if + ixp = ixp + 1 + end do + ixp = 1 + do i=1, length_dl + proc = dep_list(i) + prcid(ixp) = psb_get_mpi_rank(ictxt,proc) + sz = rvsz(proc+1) + if (sz > 0) then + call mpi_wait(rvhd(ixp),p2pstat,iret) + end if + ixp = ixp + 1 + end do + else + + do i=1, length_dl + proc = dep_list(i) + sz = sdsz(proc+1) + idx = bsdindx(proc+1) + if (sz > 0) then + call psb_snd(ictxt,sndbuf(idx+1:idx+sz), proc) + end if + end do + + do i=1, length_dl + proc = dep_list(i) + sz = rvsz(proc+1) + idx = brvindx(proc+1) + if (sz > 0) then + call psb_rcv(ictxt,rcvbuf(idx+1:idx+sz),proc) + end if + end do + + end if end if ! @@ -327,7 +396,7 @@ subroutine psi_i_desc_index(desc,index_in,dep_list,& return 9999 call psb_error_handler(ictxt,err_act) - + return end subroutine psi_i_desc_index diff --git a/base/internals/psi_graph_fnd_owner.F90 b/base/internals/psi_graph_fnd_owner.F90 index 7e69c1ef..eb62d32d 100644 --- a/base/internals/psi_graph_fnd_owner.F90 +++ b/base/internals/psi_graph_fnd_owner.F90 @@ -107,7 +107,7 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info) integer(psb_ipk_) :: ictxt,np,me, nresp integer(psb_ipk_), parameter :: nt=4 integer(psb_ipk_) :: tmpv(4) - logical, parameter :: do_timings=.false., trace=.false. + logical, parameter :: do_timings=.true., trace=.false. integer(psb_ipk_), save :: idx_sweep0=-1, idx_loop_a2a=-1, idx_loop_neigh=-1 real(psb_dpk_) :: t0, t1, t2, t3, t4 character(len=20) :: name @@ -198,7 +198,7 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info) if (do_timings) call psb_toc(idx_sweep0) fnd_owner_loop: do while (nrest_max>0) - if (do_timings) call psb_tic(idx_loop_a2a) + if (do_timings) call psb_tic(idx_loop_a2a) ! ! The basic idea of this loop is to alternate between ! searching through all processes and searching @@ -209,6 +209,7 @@ subroutine psi_graph_fnd_owner(idx,iprc,idxmap,info) ! ! if (trace.and.(me == 0)) write(0,*) 'Looping in graph_fnd_owner: ', nrest_max nsampl_in = min(n_rest,max(1,(maxspace+np-1)/np)) + !nsampl_in = min(n_rest,32) ! ! Choose a sample, should it be done in this simplistic way? ! Note: nsampl_in is a hint, not an absolute, hence nsampl_out diff --git a/base/modules/desc/psb_indx_map_mod.f90 b/base/modules/desc/psb_indx_map_mod.f90 index 378b38af..373ef780 100644 --- a/base/modules/desc/psb_indx_map_mod.f90 +++ b/base/modules/desc/psb_indx_map_mod.f90 @@ -331,9 +331,42 @@ module psb_indx_map_mod end subroutine psi_symm_dep_list_norv end interface psi_symm_dep_list + integer(psb_mpk_), parameter :: psi_adj_fnd_irecv_ = 0 + integer(psb_mpk_), parameter :: psi_adj_fnd_a2av_ = 1 + integer(psb_mpk_), parameter :: psi_adj_fnd_pbrcv_ = 2 + integer(psb_mpk_), parameter :: psi_adj_alg_max_ = psi_adj_fnd_pbrcv_ + integer(psb_mpk_), save :: psi_adj_alg = psi_adj_fnd_irecv_ + + + contains + subroutine psi_set_adj_alg(ialg) + integer(psb_mpk_), intent(in) :: ialg + if ((ialg >=0) .and. (ialg <= psi_adj_alg_max_))& + & psi_adj_alg = ialg + end subroutine psi_set_adj_alg + function psi_get_adj_alg() result(val) + integer(psb_mpk_) :: val + val = psi_adj_alg + end function psi_get_adj_alg + + function psi_get_adj_alg_fmt() result(val) + character(len=20) :: val + select case(psi_adj_alg) + case(psi_adj_fnd_a2av_) + val = 'MPI_A2AV' + case(psi_adj_fnd_irecv_) + val = 'MPI_ISEND/IRECV' + case(psi_adj_fnd_pbrcv_) + val = 'PSB_SND/RCV' + case default + val = 'Unknown ?' + end select + end function psi_get_adj_alg_fmt + + !> !! \memberof psb_indx_map !! \brief Print a descriptive name diff --git a/base/tools/psb_icdasb.F90 b/base/tools/psb_icdasb.F90 index 7176d185..6b2ba199 100644 --- a/base/tools/psb_icdasb.F90 +++ b/base/tools/psb_icdasb.F90 @@ -65,6 +65,10 @@ subroutine psb_icdasb(desc,info,ext_hv,mold) integer(psb_ipk_) :: i, n_col, dectype, err_act, n_row integer(psb_mpk_) :: np,me, icomm, ictxt logical :: ext_hv_ + logical, parameter :: do_timings=.false. + integer(psb_ipk_), save :: idx_phase1=-1, idx_phase2=-1, idx_phase3=-1 + integer(psb_ipk_), save :: idx_phase11=-1, idx_phase12=-1, idx_phase13=-1 + integer(psb_ipk_), save :: idx_total=-1 integer(psb_ipk_) :: debug_level, debug_unit character(len=20) :: name @@ -81,7 +85,22 @@ subroutine psb_icdasb(desc,info,ext_hv,mold) n_row = desc%get_local_rows() n_col = desc%get_local_cols() icomm = desc%get_mpic() - + if ((do_timings).and.(idx_total==-1)) & + & idx_total = psb_get_timer_idx("ICDASB: total ") + if ((do_timings).and.(idx_phase1==-1)) & + & idx_phase1 = psb_get_timer_idx("ICDASB: phase1 ") + if ((do_timings).and.(idx_phase2==-1)) & + & idx_phase2 = psb_get_timer_idx("ICDASB: phase2") + if ((do_timings).and.(idx_phase3==-1)) & + & idx_phase3 = psb_get_timer_idx("ICDASB: phase3") +!!$ if ((do_timings).and.(idx_phase11==-1)) & +!!$ & idx_phase11 = psb_get_timer_idx("ICDASB: phase11 ") +!!$ if ((do_timings).and.(idx_phase12==-1)) & +!!$ & idx_phase12 = psb_get_timer_idx("ICDASB: phase12") +!!$ if ((do_timings).and.(idx_phase13==-1)) & +!!$ & idx_phase13 = psb_get_timer_idx("ICDASB: phase13") + + call psb_tic(idx_total) ! check on blacs grid call psb_info(ictxt, me, np) if (np == -1) then @@ -115,6 +134,7 @@ subroutine psb_icdasb(desc,info,ext_hv,mold) & write(debug_unit, *) me,' ',trim(name),': start' if (allocated(desc%indxmap)) then + if (do_timings) call psb_tic(idx_phase1) if (.not.ext_hv_) then call psi_bld_tmphalo(desc,info) if (info /= psb_success_) then @@ -122,7 +142,8 @@ subroutine psb_icdasb(desc,info,ext_hv,mold) goto 9999 end if end if - + if (do_timings) call psb_toc(idx_phase1) + if (do_timings) call psb_tic(idx_phase2) ! Take out the lists for ovrlap, halo and ext... call psb_move_alloc(desc%ovrlap_index,ovrlap_index,info) call psb_move_alloc(desc%halo_index,halo_index,info) @@ -144,6 +165,8 @@ subroutine psb_icdasb(desc,info,ext_hv,mold) call psb_errpush(info,name) goto 9999 end if + if (do_timings) call psb_toc(idx_phase2) + if (do_timings) call psb_tic(idx_phase3) call desc%indxmap%asb(info) if (info == psb_success_) then @@ -154,14 +177,14 @@ subroutine psb_icdasb(desc,info,ext_hv,mold) write(0,*) 'Error from internal indxmap asb ',info info = psb_success_ end if - + if (do_timings) call psb_toc(idx_phase3) else info = psb_err_invalid_cd_state_ call psb_errpush(info,name) goto 9999 endif - + call psb_toc(idx_total) if (debug_level >= psb_debug_ext_) & & write(debug_unit,*) me,' ',trim(name),': Done'